kratos/contrib/registry/discovery/discovery.go

477 lines
9.9 KiB

package discovery
import (
"context"
"fmt"
"math/rand"
"net/url"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/go-resty/resty/v2"
"github.com/pkg/errors"
"github.com/go-kratos/kratos/v2/log"
)
type Discovery struct {
config *Config
once sync.Once
ctx context.Context
cancelFunc context.CancelFunc
httpClient *resty.Client
node atomic.Value
nodeIdx uint64
mutex sync.RWMutex
apps map[string]*appInfo
registry map[string]struct{}
lastHost string
cancelPolls context.CancelFunc
}
type appInfo struct {
resolver map[*Resolve]struct{}
zoneIns atomic.Value
lastTs int64 // latest timestamp
}
// New construct a Discovery instance which implements registry.Registrar,
// registry.Discovery and registry.Watcher.
func New(c *Config) *Discovery {
if c == nil {
c = new(Config)
}
if err := fixConfig(c); err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
d := &Discovery{
config: c,
ctx: ctx,
cancelFunc: cancel,
apps: map[string]*appInfo{},
registry: map[string]struct{}{},
}
d.httpClient = resty.New().
SetTimeout(40 * time.Second)
// Discovery self found and watch
r := d.resolveBuild(_discoveryAppID)
event := r.Watch()
_, ok := <-event
if !ok {
panic("Discovery watch self failed")
}
discoveryIns, ok := r.fetch(context.Background())
if ok {
d.newSelf(discoveryIns.Instances)
}
go d.selfProc(r, event)
return d
}
// Close stop all running process including Discovery and register
func (d *Discovery) Close() error {
d.cancelFunc()
return nil
}
// selfProc start a goroutine to refresh Discovery self registration information.
func (d *Discovery) selfProc(resolver *Resolve, event <-chan struct{}) {
for {
_, ok := <-event
if !ok {
return
}
zones, ok := resolver.fetch(context.Background())
if ok {
d.newSelf(zones.Instances)
}
}
}
// newSelf
func (d *Discovery) newSelf(zones map[string][]*discoveryInstance) {
ins, ok := zones[d.config.Zone]
if !ok {
return
}
var nodes []string
for _, in := range ins {
for _, addr := range in.Addrs {
u, err := url.Parse(addr)
if err == nil && u.Scheme == "http" {
nodes = append(nodes, u.Host)
}
}
}
// diff old nodes
var olds int
for _, n := range nodes {
if node, ok := d.node.Load().([]string); ok {
for _, o := range node {
if o == n {
olds++
break
}
}
}
}
if len(nodes) == olds {
return
}
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
d.node.Store(nodes)
}
// resolveBuild Discovery resolver builder.
func (d *Discovery) resolveBuild(appID string) *Resolve {
r := &Resolve{
id: appID,
d: d,
event: make(chan struct{}, 1),
}
d.mutex.Lock()
app, ok := d.apps[appID]
if !ok {
app = &appInfo{
resolver: make(map[*Resolve]struct{}),
}
d.apps[appID] = app
cancel := d.cancelPolls
if cancel != nil {
cancel()
}
}
app.resolver[r] = struct{}{}
d.mutex.Unlock()
if ok {
select {
case r.event <- struct{}{}:
default:
}
}
log.Debugf("Discovery: AddWatch(%s) already watch(%v)", appID, ok)
d.once.Do(func() {
go d.serverProc()
})
return r
}
func (d *Discovery) serverProc() {
defer log.Debug("Discovery serverProc quit")
var (
retry int
ctx context.Context
cancel context.CancelFunc
)
ticker := time.NewTicker(time.Minute * 30)
defer ticker.Stop()
for {
if ctx == nil {
ctx, cancel = context.WithCancel(d.ctx)
d.mutex.Lock()
d.cancelPolls = cancel
d.mutex.Unlock()
}
select {
case <-d.ctx.Done():
return
case <-ticker.C:
d.switchNode()
default:
}
apps, err := d.polls(ctx)
if err != nil {
d.switchNode()
if ctx.Err() == context.Canceled {
ctx = nil
continue
}
time.Sleep(time.Second)
retry++
continue
}
retry = 0
d.broadcast(apps)
}
}
func (d *Discovery) pickNode() string {
nodes, ok := d.node.Load().([]string)
if !ok || len(nodes) == 0 {
return d.config.Nodes[rand.Intn(len(d.config.Nodes))]
}
return nodes[atomic.LoadUint64(&d.nodeIdx)%uint64(len(nodes))]
}
func (d *Discovery) switchNode() {
atomic.AddUint64(&d.nodeIdx, 1)
}
// renew an instance with Discovery
func (d *Discovery) renew(ctx context.Context, ins *discoveryInstance) (err error) {
d.mutex.RLock()
c := d.config
d.mutex.RUnlock()
res := new(discoveryCommonResp)
uri := fmt.Sprintf(_renewURL, d.pickNode())
// construct parameters to renew
p := newParams(d.config)
p.Set(_paramKeyAppID, ins.AppID)
// send request to Discovery server.
if _, err = d.httpClient.R().
SetContext(ctx).
SetQueryParamsFromValues(p).
SetResult(&res).
Post(uri); err != nil {
d.switchNode()
log.Errorf("Discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
uri, c.Env, ins.AppID, c.Host, err)
return
}
if res.Code != _codeOK {
err = fmt.Errorf("discovery.renew failed ErrorCode: %d", res.Code)
if res.Code == _codeNotFound {
if err = d.register(ctx, ins); err != nil {
err = errors.Wrap(err, "Discovery.renew instance, and failed to register ins")
}
return
}
log.Errorf(
"Discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
uri, c.Env, ins.AppID, c.Host, res.Code,
)
}
return
}
// cancel Remove the registered instance from Discovery
func (d *Discovery) cancel(ins *discoveryInstance) (err error) {
d.mutex.RLock()
config := d.config
d.mutex.RUnlock()
res := new(discoveryCommonResp)
uri := fmt.Sprintf(_cancelURL, d.pickNode())
p := newParams(d.config)
p.Set(_paramKeyAppID, ins.AppID)
// request
// send request to Discovery server.
if _, err = d.httpClient.R().
SetContext(context.TODO()).
SetQueryParamsFromValues(p).
SetResult(&res).
Post(uri); err != nil {
d.switchNode()
log.Errorf("Discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
uri, config.Env, ins.AppID, config.Host, err)
return
}
// handle response error
if res.Code != _codeOK {
if res.Code == _codeNotFound {
return nil
}
log.Warnf("Discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
uri, config.Env, ins.AppID, config.Host, res.Code)
err = fmt.Errorf("ErrorCode: %d", res.Code)
return
}
return
}
func (d *Discovery) broadcast(apps map[string]*disInstancesInfo) {
for appID, v := range apps {
var count int
// v maybe nil in old version(less than v1.1) Discovery, check in case of panic
if v == nil {
continue
}
for zone, ins := range v.Instances {
if len(ins) == 0 {
delete(v.Instances, zone)
}
count += len(ins)
}
if count == 0 {
continue
}
d.mutex.RLock()
app, ok := d.apps[appID]
d.mutex.RUnlock()
if ok {
app.lastTs = v.LastTs
app.zoneIns.Store(v)
d.mutex.RLock()
for rs := range app.resolver {
select {
case rs.event <- struct{}{}:
default:
}
}
d.mutex.RUnlock()
}
}
}
func (d *Discovery) polls(ctx context.Context) (apps map[string]*disInstancesInfo, err error) {
var (
lastTss = make([]int64, 0, 4)
appIDs = make([]string, 0, 16)
host = d.pickNode()
changed bool
)
if host != d.lastHost {
d.lastHost = host
changed = true
}
d.mutex.RLock()
config := d.config
for k, v := range d.apps {
if changed {
v.lastTs = 0
}
appIDs = append(appIDs, k)
lastTss = append(lastTss, v.lastTs)
}
d.mutex.RUnlock()
// if there is no app, polls just return.
if len(appIDs) == 0 {
return
}
uri := fmt.Sprintf(_pollURL, host)
res := new(discoveryPollsResp)
// params
p := newParams(nil)
p.Set(_paramKeyEnv, config.Env)
p.Set(_paramKeyHostname, config.Host)
for _, appID := range appIDs {
p.Add(_paramKeyAppID, appID)
}
for _, ts := range lastTss {
p.Add("latest_timestamp", strconv.FormatInt(ts, 10))
}
// request
reqURI := uri + "?" + p.Encode()
if _, err = d.httpClient.R().
SetContext(ctx).
SetQueryParamsFromValues(p).
SetResult(res).Get(uri); err != nil {
d.switchNode()
log.Errorf("Discovery: client.Get(%s) error(%+v)", reqURI, err)
return nil, err
}
if res.Code != _codeOK {
if res.Code != _codeNotModified {
log.Errorf("Discovery: client.Get(%s) get error code(%d)", reqURI, res.Code)
}
err = fmt.Errorf("discovery.polls failed ErrCode: %d", res.Code)
return
}
for _, app := range res.Data {
if app.LastTs == 0 {
err = ErrServerError
log.Errorf("Discovery: client.Get(%s) latest_timestamp is 0, instances:(%+v)", reqURI, res.Data)
return
}
}
log.Debugf("Discovery: successfully polls(%s) instances (%+v)", reqURI, res.Data)
apps = res.Data
return
}
// Resolve Discovery resolver.
type Resolve struct {
id string
event chan struct{}
d *Discovery
}
// Watch instance.
func (r *Resolve) Watch() <-chan struct{} {
return r.event
}
// fetch resolver instance.
func (r *Resolve) fetch(ctx context.Context) (ins *disInstancesInfo, ok bool) {
r.d.mutex.RLock()
app, ok := r.d.apps[r.id]
r.d.mutex.RUnlock()
if ok {
var appIns *disInstancesInfo
appIns, ok = app.zoneIns.Load().(*disInstancesInfo)
if !ok {
return
}
ins = new(disInstancesInfo)
ins.LastTs = appIns.LastTs
ins.Scheduler = appIns.Scheduler
ins.Instances = make(map[string][]*discoveryInstance, len(appIns.Instances))
for zone, in := range appIns.Instances {
ins.Instances[zone] = in
}
//if r.opt.Filter != nil {
// ins.Instances = r.opt.Filter(appIns.Instances)
//} else {
// ins.Instances = make(map[string][]*discoveryInstance)
// for zone, in := range appIns.Instances {
// ins.Instances[zone] = in
// }
//}
//if r.opt.scheduler != nil {
// ins.Instances[r.opt.ClientZone] = r.opt.scheduler(ins)
//}
//if r.opt.Subset != nil && r.opt.SubsetSize != 0 {
// for zone, inss := range ins.Instances {
// ins.Instances[zone] = r.opt.Subset(inss, r.opt.SubsetSize)
// }
//}
}
return
}
// Close resolver
func (r *Resolve) Close() error {
r.d.mutex.Lock()
if app, ok := r.d.apps[r.id]; ok && len(app.resolver) != 0 {
delete(app.resolver, r)
// TODO: delete app from builder
}
r.d.mutex.Unlock()
return nil
}