package etcd import ( "context" "fmt" "math/rand" "time" "github.com/go-kratos/kratos/v2/registry" clientv3 "go.etcd.io/etcd/client/v3" ) var ( _ registry.Registrar = &Registry{} _ registry.Discovery = &Registry{} ) // Option is etcd registry option. type Option func(o *options) type options struct { ctx context.Context namespace string ttl time.Duration maxRetry int } // Context with registry context. func Context(ctx context.Context) Option { return func(o *options) { o.ctx = ctx } } // Namespace with registry namespace. func Namespace(ns string) Option { return func(o *options) { o.namespace = ns } } // RegisterTTL with register ttl. func RegisterTTL(ttl time.Duration) Option { return func(o *options) { o.ttl = ttl } } func MaxRetry(num int) Option { return func(o *options) { o.maxRetry = num } } // Registry is etcd registry. type Registry struct { opts *options client *clientv3.Client kv clientv3.KV lease clientv3.Lease } // New creates etcd registry func New(client *clientv3.Client, opts ...Option) (r *Registry) { op := &options{ ctx: context.Background(), namespace: "/microservices", ttl: time.Second * 15, maxRetry: 5, } for _, o := range opts { o(op) } return &Registry{ opts: op, client: client, kv: clientv3.NewKV(client), } } // Register the registration. func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error { key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID) value, err := marshal(service) if err != nil { return err } if r.lease != nil { r.lease.Close() } r.lease = clientv3.NewLease(r.client) leaseID, err := r.registerWithKV(ctx, key, value) if err != nil { return err } go r.heartBeat(r.opts.ctx, leaseID, key, value) return nil } // Deregister the registration. func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error { defer func() { if r.lease != nil { r.lease.Close() } }() key := fmt.Sprintf("%s/%s/%s", r.opts.namespace, service.Name, service.ID) _, err := r.client.Delete(ctx, key) return err } // GetService return the service instances in memory according to the service name. func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) { key := fmt.Sprintf("%s/%s", r.opts.namespace, name) resp, err := r.kv.Get(ctx, key, clientv3.WithPrefix()) if err != nil { return nil, err } items := make([]*registry.ServiceInstance, len(resp.Kvs)) for i, kv := range resp.Kvs { si, err := unmarshal(kv.Value) if err != nil { return nil, err } items[i] = si } return items, nil } // Watch creates a watcher according to the service name. func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) { key := fmt.Sprintf("%s/%s", r.opts.namespace, name) return newWatcher(ctx, key, r.client) } // registerWithKV create a new lease, return current leaseID func (r *Registry) registerWithKV(ctx context.Context, key string, value string) (clientv3.LeaseID, error) { grant, err := r.lease.Grant(ctx, int64(r.opts.ttl.Seconds())) if err != nil { return 0, err } _, err = r.client.Put(ctx, key, value, clientv3.WithLease(grant.ID)) if err != nil { return 0, err } return grant.ID, nil } func (r *Registry) heartBeat(ctx context.Context, leaseID clientv3.LeaseID, key string, value string) { curLeaseID := leaseID kac, err := r.client.KeepAlive(ctx, leaseID) if err != nil { curLeaseID = 0 } rand.Seed(time.Now().Unix()) for { if curLeaseID == 0 { // try to registerWithKV retreat := []int{} for retryCnt := 0; retryCnt < r.opts.maxRetry; retryCnt++ { if ctx.Err() != nil { return } // prevent infinite blocking idChan := make(chan clientv3.LeaseID, 1) errChan := make(chan error, 1) cancelCtx, cancel := context.WithCancel(ctx) go func() { defer cancel() id, registerErr := r.registerWithKV(cancelCtx, key, value) if registerErr != nil { errChan <- registerErr } else { idChan <- id } }() select { case <-time.After(3 * time.Second): cancel() continue case <-errChan: continue case curLeaseID = <-idChan: } kac, err = r.client.KeepAlive(ctx, curLeaseID) if err == nil { break } retreat = append(retreat, 1<