package etcd import ( "context" "encoding/json" "errors" "flag" "fmt" "os" "strings" "sync" "sync/atomic" "time" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/client/v3" "google.golang.org/grpc" "github.com/go-kratos/kratos/pkg/log" "github.com/go-kratos/kratos/pkg/naming" ) var ( //etcdPrefix is a etcd globe key prefix endpoints string etcdPrefix string //Time units is second registerTTL = 90 defaultDialTimeout = 30 ) var ( _once sync.Once _builder naming.Builder //ErrDuplication is a register duplication err ErrDuplication = errors.New("etcd: instance duplicate registration") ) func init() { addFlag(flag.CommandLine) } func addFlag(fs *flag.FlagSet) { // env fs.StringVar(&endpoints, "etcd.endpoints", os.Getenv("ETCD_ENDPOINTS"), "etcd.endpoints is etcd endpoints. value: 127.0.0.1:2379,127.0.0.2:2379 etc.") fs.StringVar(&etcdPrefix, "etcd.prefix", defaultString("ETCD_PREFIX", "kratos_etcd"), "etcd globe key prefix or use ETCD_PREFIX env variable. value etcd_prefix etc.") } func defaultString(env, value string) string { v := os.Getenv(env) if v == "" { return value } return v } // Builder return default etcd resolver builder. func Builder(c *clientv3.Config) naming.Builder { _once.Do(func() { _builder, _ = New(c) }) return _builder } // Build register resolver into default etcd. func Build(c *clientv3.Config, id string) naming.Resolver { return Builder(c).Build(id) } // EtcdBuilder is a etcd clientv3 EtcdBuilder type EtcdBuilder struct { cli *clientv3.Client ctx context.Context cancelFunc context.CancelFunc mutex sync.RWMutex apps map[string]*appInfo registry map[string]struct{} } type appInfo struct { resolver map[*Resolve]struct{} ins atomic.Value e *EtcdBuilder once sync.Once } // Resolve etch resolver. type Resolve struct { id string event chan struct{} e *EtcdBuilder opt *naming.BuildOptions } // New is new a etcdbuilder func New(c *clientv3.Config) (e *EtcdBuilder, err error) { if c == nil { if endpoints == "" { panic(fmt.Errorf("invalid etcd config endpoints:%+v", endpoints)) } c = &clientv3.Config{ Endpoints: strings.Split(endpoints, ","), DialTimeout: time.Second * time.Duration(defaultDialTimeout), DialOptions: []grpc.DialOption{grpc.WithBlock()}, } } cli, err := clientv3.New(*c) if err != nil { return nil, err } ctx, cancel := context.WithCancel(context.Background()) e = &EtcdBuilder{ cli: cli, ctx: ctx, cancelFunc: cancel, apps: map[string]*appInfo{}, registry: map[string]struct{}{}, } return } // Build disovery resovler builder. func (e *EtcdBuilder) Build(appid string, opts ...naming.BuildOpt) naming.Resolver { r := &Resolve{ id: appid, e: e, event: make(chan struct{}, 1), opt: new(naming.BuildOptions), } e.mutex.Lock() app, ok := e.apps[appid] if !ok { app = &appInfo{ resolver: make(map[*Resolve]struct{}), e: e, } e.apps[appid] = app } app.resolver[r] = struct{}{} e.mutex.Unlock() if ok { select { case r.event <- struct{}{}: default: } } app.once.Do(func() { go app.watch(appid) log.Info("etcd: AddWatch(%s) already watch(%v)", appid, ok) }) return r } // Scheme return etcd's scheme func (e *EtcdBuilder) Scheme() string { return "etcd" } // Register is register instance func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { e.mutex.Lock() if _, ok := e.registry[ins.AppID]; ok { err = ErrDuplication } else { e.registry[ins.AppID] = struct{}{} } e.mutex.Unlock() if err != nil { return } ctx, cancel := context.WithCancel(e.ctx) if err = e.register(ctx, ins); err != nil { e.mutex.Lock() delete(e.registry, ins.AppID) e.mutex.Unlock() cancel() return } ch := make(chan struct{}, 1) cancelFunc = context.CancelFunc(func() { cancel() <-ch }) go func() { ticker := time.NewTicker(time.Duration(registerTTL/3) * time.Second) defer ticker.Stop() for { select { case <-ticker.C: _ = e.register(ctx, ins) case <-ctx.Done(): _ = e.unregister(ins) ch <- struct{}{} return } } }() return } //注册和续约公用一个操作 func (e *EtcdBuilder) register(ctx context.Context, ins *naming.Instance) (err error) { prefix := e.keyPrefix(ins) val, _ := json.Marshal(ins) ttlResp, err := e.cli.Grant(context.TODO(), int64(registerTTL)) if err != nil { log.Error("etcd: register client.Grant(%v) error(%v)", registerTTL, err) return err } _, err = e.cli.Put(ctx, prefix, string(val), clientv3.WithLease(ttlResp.ID)) if err != nil { log.Error("etcd: register client.Put(%v) appid(%s) hostname(%s) error(%v)", prefix, ins.AppID, ins.Hostname, err) return err } return nil } func (e *EtcdBuilder) unregister(ins *naming.Instance) (err error) { prefix := e.keyPrefix(ins) if _, err = e.cli.Delete(context.TODO(), prefix); err != nil { log.Error("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) error(%v)", prefix, ins.AppID, ins.Hostname, err) } log.Info("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) success", prefix, ins.AppID, ins.Hostname) return } func (e *EtcdBuilder) keyPrefix(ins *naming.Instance) string { return fmt.Sprintf("/%s/%s/%s", etcdPrefix, ins.AppID, ins.Hostname) } // Close stop all running process including etcdfetch and register func (e *EtcdBuilder) Close() error { e.cancelFunc() return nil } func (a *appInfo) watch(appID string) { _ = a.fetchstore(appID) prefix := fmt.Sprintf("/%s/%s/", etcdPrefix, appID) rch := a.e.cli.Watch(a.e.ctx, prefix, clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { if ev.Type == mvccpb.PUT || ev.Type == mvccpb.DELETE { _ = a.fetchstore(appID) } } } } func (a *appInfo) fetchstore(appID string) (err error) { prefix := fmt.Sprintf("/%s/%s/", etcdPrefix, appID) resp, err := a.e.cli.Get(a.e.ctx, prefix, clientv3.WithPrefix()) if err != nil { log.Error("etcd: fetch client.Get(%s) error(%+v)", prefix, err) return err } ins, err := a.paserIns(resp) if err != nil { return err } a.store(ins) return nil } func (a *appInfo) store(ins *naming.InstancesInfo) { a.ins.Store(ins) a.e.mutex.RLock() for rs := range a.resolver { select { case rs.event <- struct{}{}: default: } } a.e.mutex.RUnlock() } func (a *appInfo) paserIns(resp *clientv3.GetResponse) (ins *naming.InstancesInfo, err error) { ins = &naming.InstancesInfo{ Instances: make(map[string][]*naming.Instance), } for _, ev := range resp.Kvs { in := new(naming.Instance) err := json.Unmarshal(ev.Value, in) if err != nil { return nil, err } ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in) } return ins, nil } // Watch watch instance. func (r *Resolve) Watch() <-chan struct{} { return r.event } // Fetch fetch resolver instance. func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) { r.e.mutex.RLock() app, ok := r.e.apps[r.id] r.e.mutex.RUnlock() if ok { ins, ok = app.ins.Load().(*naming.InstancesInfo) return } return } // Close close resolver. func (r *Resolve) Close() error { r.e.mutex.Lock() if app, ok := r.e.apps[r.id]; ok && len(app.resolver) != 0 { delete(app.resolver, r) } r.e.mutex.Unlock() return nil }