diff --git a/contrib/registry/zookeeper/go.mod b/contrib/registry/zookeeper/go.mod index 43bc93499..bb3bb4826 100644 --- a/contrib/registry/zookeeper/go.mod +++ b/contrib/registry/zookeeper/go.mod @@ -5,6 +5,6 @@ go 1.16 require ( github.com/go-kratos/kratos/v2 v2.2.2 github.com/go-zookeeper/zk v1.0.2 + golang.org/x/sync v0.0.0-20210220032951-036812b2e83c ) - replace github.com/go-kratos/kratos/v2 => ../../../ diff --git a/contrib/registry/zookeeper/go.sum b/contrib/registry/zookeeper/go.sum index 3b8d2e0f2..977a10cac 100644 --- a/contrib/registry/zookeeper/go.sum +++ b/contrib/registry/zookeeper/go.sum @@ -26,6 +26,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7 github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/go-kratos/aegis v0.1.1/go.mod h1:jYeSQ3Gesba478zEnujOiG5QdsyF3Xk/8owFUeKcHxw= +github.com/go-kratos/kratos/v2 v2.2.2 h1:5omkfwLiaNnfLdpJkpLwz5beGCehSTDL7vdPQ3lXGtI= +github.com/go-kratos/kratos/v2 v2.2.2/go.mod h1:yebXu5KMayLjXZzMTY5HWIPRDwcBehHpiNF/Ot8A2pA= github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI= @@ -96,6 +98,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/contrib/registry/zookeeper/register.go b/contrib/registry/zookeeper/register.go index adae0050f..2aedc198a 100644 --- a/contrib/registry/zookeeper/register.go +++ b/contrib/registry/zookeeper/register.go @@ -2,13 +2,10 @@ package zookeeper import ( "context" - "encoding/json" "path" - "sync" - "sync/atomic" - "time" "github.com/go-zookeeper/zk" + "golang.org/x/sync/singleflight" "github.com/go-kratos/kratos/v2/registry" ) @@ -22,26 +19,14 @@ var ( type Option func(o *options) type options struct { - ctx context.Context - rootPath string - timeout time.Duration - user string - password string -} - -// WithContext with registry context. -func WithContext(ctx context.Context) Option { - return func(o *options) { o.ctx = ctx } + namespace string + user string + password string } // WithRootPath with registry root path. func WithRootPath(path string) Option { - return func(o *options) { o.rootPath = path } -} - -// WithTimeout with registry timeout. -func WithTimeout(timeout time.Duration) Option { - return func(o *options) { o.timeout = timeout } + return func(o *options) { o.namespace = path } } // WithDigestACL with registry password. @@ -54,36 +39,23 @@ func WithDigestACL(user string, password string) Option { // Registry is consul registry type Registry struct { - opts *options - conn *zk.Conn - lock sync.Mutex - registry map[string]*serviceSet + opts *options + conn *zk.Conn + + group singleflight.Group } -func New(zkServers []string, opts ...Option) (*Registry, error) { +func New(conn *zk.Conn, opts ...Option) *Registry { options := &options{ - ctx: context.Background(), - rootPath: "/microservices", - timeout: time.Second * 5, + namespace: "/microservices", } for _, o := range opts { o(options) } - conn, _, err := zk.Connect(zkServers, options.timeout) - if err != nil { - return nil, err - } - if len(options.user) > 0 && len(options.password) > 0 { - err = conn.AddAuth("digest", []byte(options.user+":"+options.password)) - if err != nil { - return nil, err - } - } return &Registry{ - opts: options, - conn: conn, - registry: make(map[string]*serviceSet), - }, err + opts: options, + conn: conn, + } } func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error { @@ -91,14 +63,14 @@ func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstan data []byte err error ) - if err = r.ensureName(r.opts.rootPath, []byte(""), 0); err != nil { + if err = r.ensureName(r.opts.namespace, []byte(""), 0); err != nil { return err } - serviceNamePath := path.Join(r.opts.rootPath, service.Name) + serviceNamePath := path.Join(r.opts.namespace, service.Name) if err = r.ensureName(serviceNamePath, []byte(""), 0); err != nil { return err } - if data, err = json.Marshal(service); err != nil { + if data, err = marshal(service); err != nil { return err } servicePath := path.Join(serviceNamePath, service.ID) @@ -111,7 +83,7 @@ func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstan // Deregister registry service to zookeeper. func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error { ch := make(chan error, 1) - servicePath := path.Join(r.opts.rootPath, service.Name, service.ID) + servicePath := path.Join(r.opts.namespace, service.Name, service.ID) go func() { err := r.conn.Delete(servicePath, -1) ch <- err @@ -127,68 +99,36 @@ func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInst // GetService get services from zookeeper func (r *Registry) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) { - serviceNamePath := path.Join(r.opts.rootPath, serviceName) - servicesID, _, err := r.conn.Children(serviceNamePath) - if err != nil { - return nil, err - } - items := make([]*registry.ServiceInstance, 0, len(servicesID)) - for _, service := range servicesID { - item := ®istry.ServiceInstance{} - servicePath := path.Join(serviceNamePath, service) - serviceInstanceByte, _, err := r.conn.Get(servicePath) + instances, err, _ := r.group.Do(serviceName, func() (interface{}, error) { + serviceNamePath := path.Join(r.opts.namespace, serviceName) + servicesID, _, err := r.conn.Children(serviceNamePath) if err != nil { return nil, err } - if err := json.Unmarshal(serviceInstanceByte, item); err != nil { - return nil, err + items := make([]*registry.ServiceInstance, 0, len(servicesID)) + for _, service := range servicesID { + servicePath := path.Join(serviceNamePath, service) + serviceInstanceByte, _, err := r.conn.Get(servicePath) + if err != nil { + return nil, err + } + item, err := unmarshal(serviceInstanceByte) + if err != nil { + return nil, err + } + items = append(items, item) } - items = append(items, item) + return items, nil + }) + if err != nil { + return nil, err } - return items, nil + return instances.([]*registry.ServiceInstance), nil } func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) { - r.lock.Lock() - defer r.lock.Unlock() - set, ok := r.registry[serviceName] - if !ok { - set = &serviceSet{ - watcher: make(map[*watcher]struct{}), - services: &atomic.Value{}, - serviceName: serviceName, - } - r.registry[serviceName] = set - } - // 初始化watcher - w := &watcher{ - event: make(chan struct{}, 1), - } - w.ctx, w.cancel = context.WithCancel(context.Background()) - w.set = set - set.lock.Lock() - set.watcher[w] = struct{}{} - set.lock.Unlock() - ss, _ := set.services.Load().([]*registry.ServiceInstance) - if len(ss) > 0 { - // 如果services有值需要推送给watcher,否则watch的时候可能会永远阻塞拿不到初始的数据 - w.event <- struct{}{} - } - - // 放在最后是为了防止漏推送 - if !ok { - go r.resolve(set) - } - return w, nil -} - -func (r *Registry) resolve(ss *serviceSet) { - ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) - services, err := r.GetService(ctx, ss.serviceName) - cancel() - if err == nil && len(services) > 0 { - ss.broadcast(services) - } + prefix := path.Join(r.opts.namespace, serviceName) + return newWatcher(ctx, prefix, serviceName, r.conn) } // ensureName ensure node exists, if not exist, create and set data diff --git a/contrib/registry/zookeeper/register_test.go b/contrib/registry/zookeeper/register_test.go index 9293e2b93..8054548da 100644 --- a/contrib/registry/zookeeper/register_test.go +++ b/contrib/registry/zookeeper/register_test.go @@ -6,6 +6,7 @@ import ( "time" "github.com/go-kratos/kratos/v2/registry" + "github.com/go-zookeeper/zk" ) func TestRegistry(t *testing.T) { @@ -16,11 +17,23 @@ func TestRegistry(t *testing.T) { Endpoints: []string{"http://127.0.0.1:1111"}, } - r, _ := New([]string{"127.0.0.1:2181"}, WithDigestACL("username", "password")) + conn, _, err := zk.Connect([]string{"127.0.0.1:2181"}, time.Second*15) + if err != nil { + t.Fatal(err) + return + } + r := New(conn) + err = r.Register(ctx, s) + if err != nil { + t.Fatal(err) + return + } + time.Sleep(time.Second) w, err := r.Watch(ctx, s.Name) if err != nil { t.Fatal(err) + return } defer func() { _ = w.Stop() @@ -29,6 +42,7 @@ func TestRegistry(t *testing.T) { for { res, nextErr := w.Next() if nextErr != nil { + t.Errorf("watch next error: %s", nextErr) return } t.Logf("watch: %d", len(res)) diff --git a/contrib/registry/zookeeper/service.go b/contrib/registry/zookeeper/service.go index 59eb7a13d..36cd72be2 100644 --- a/contrib/registry/zookeeper/service.go +++ b/contrib/registry/zookeeper/service.go @@ -1,27 +1,16 @@ package zookeeper import ( - "sync" - "sync/atomic" + "encoding/json" "github.com/go-kratos/kratos/v2/registry" ) -type serviceSet struct { - serviceName string - watcher map[*watcher]struct{} - services *atomic.Value - lock sync.RWMutex +func marshal(si *registry.ServiceInstance) ([]byte, error) { + return json.Marshal(si) } -func (s *serviceSet) broadcast(ss []*registry.ServiceInstance) { - s.services.Store(ss) - s.lock.RLock() - defer s.lock.RUnlock() - for k := range s.watcher { - select { - case k.event <- struct{}{}: - default: - } - } +func unmarshal(data []byte) (si *registry.ServiceInstance, err error) { + err = json.Unmarshal(data, &si) + return } diff --git a/contrib/registry/zookeeper/watcher.go b/contrib/registry/zookeeper/watcher.go index c56f3d080..76f502fa8 100644 --- a/contrib/registry/zookeeper/watcher.go +++ b/contrib/registry/zookeeper/watcher.go @@ -2,36 +2,101 @@ package zookeeper import ( "context" + "errors" + "path" + "sync/atomic" "github.com/go-kratos/kratos/v2/registry" + "github.com/go-zookeeper/zk" ) var _ registry.Watcher = &watcher{} +var ErrWatcherStopped = errors.New("watcher stopped") + type watcher struct { ctx context.Context + event chan zk.Event + conn *zk.Conn cancel context.CancelFunc - event chan struct{} - set *serviceSet + + first uint32 + // 前缀 + prefix string + // watch 的服务名 + serviceName string +} + +func newWatcher(ctx context.Context, prefix, serviceName string, conn *zk.Conn) (*watcher, error) { + w := &watcher{conn: conn, event: make(chan zk.Event, 1), prefix: prefix, serviceName: serviceName} + w.ctx, w.cancel = context.WithCancel(ctx) + go w.watch(w.ctx) + return w, nil } -func (w watcher) Next() (services []*registry.ServiceInstance, err error) { +func (w *watcher) watch(ctx context.Context) { + for { + // 每次 watch 只有一次有效期 所以循环 watch + _, _, ch, err := w.conn.ChildrenW(w.prefix) + if err != nil { + w.event <- zk.Event{Err: err} + } + select { + case <-ctx.Done(): + return + default: + w.event <- <-ch + } + } +} + +func (w *watcher) Next() ([]*registry.ServiceInstance, error) { + // todo 如果多处调用 next 可能会导致多实例信息不同步 + if atomic.CompareAndSwapUint32(&w.first, 0, 1) { + return w.getServices() + } select { case <-w.ctx.Done(): - err = w.ctx.Err() - case <-w.event: + return nil, w.ctx.Err() + case e := <-w.event: + if e.State == zk.StateDisconnected { + return nil, ErrWatcherStopped + } + if e.Err != nil { + return nil, e.Err + } + return w.getServices() } - ss, ok := w.set.services.Load().([]*registry.ServiceInstance) - if ok { - services = append(services, ss...) - } - return } func (w *watcher) Stop() error { w.cancel() - w.set.lock.Lock() - defer w.set.lock.Unlock() - delete(w.set.watcher, w) return nil } + +func (w *watcher) getServices() ([]*registry.ServiceInstance, error) { + servicesID, _, err := w.conn.Children(w.prefix) + if err != nil { + return nil, err + } + items := make([]*registry.ServiceInstance, 0, len(servicesID)) + for _, id := range servicesID { + servicePath := path.Join(w.prefix, id) + b, _, err := w.conn.Get(servicePath) + if err != nil { + return nil, err + } + item, err := unmarshal(b) + if err != nil { + return nil, err + } + + // 与 watch 的服务名不同 则跳过 + if item.Name != w.serviceName { + continue + } + + items = append(items, item) + } + return items, nil +}