feat(registry): zookeeper watch node changed (#1986)

* feat(registry): zookeeper watch node changed

* fix lint

* fix lint

* fix lint

* fix lint

* fix(stop): cancel context when stop

* fix(mod): add replace
pull/2014/head
Ccheers 3 years ago committed by GitHub
parent c412d65f57
commit 8dec7cf5e8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      contrib/registry/zookeeper/go.mod
  2. 3
      contrib/registry/zookeeper/go.sum
  3. 106
      contrib/registry/zookeeper/register.go
  4. 16
      contrib/registry/zookeeper/register_test.go
  5. 23
      contrib/registry/zookeeper/service.go
  6. 89
      contrib/registry/zookeeper/watcher.go

@ -5,6 +5,6 @@ go 1.16
require (
github.com/go-kratos/kratos/v2 v2.2.2
github.com/go-zookeeper/zk v1.0.2
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c
)
replace github.com/go-kratos/kratos/v2 => ../../../

@ -26,6 +26,8 @@ github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kratos/aegis v0.1.1/go.mod h1:jYeSQ3Gesba478zEnujOiG5QdsyF3Xk/8owFUeKcHxw=
github.com/go-kratos/kratos/v2 v2.2.2 h1:5omkfwLiaNnfLdpJkpLwz5beGCehSTDL7vdPQ3lXGtI=
github.com/go-kratos/kratos/v2 v2.2.2/go.mod h1:yebXu5KMayLjXZzMTY5HWIPRDwcBehHpiNF/Ot8A2pA=
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI=
@ -96,6 +98,7 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=

@ -2,13 +2,10 @@ package zookeeper
import (
"context"
"encoding/json"
"path"
"sync"
"sync/atomic"
"time"
"github.com/go-zookeeper/zk"
"golang.org/x/sync/singleflight"
"github.com/go-kratos/kratos/v2/registry"
)
@ -22,26 +19,14 @@ var (
type Option func(o *options)
type options struct {
ctx context.Context
rootPath string
timeout time.Duration
namespace string
user string
password string
}
// WithContext with registry context.
func WithContext(ctx context.Context) Option {
return func(o *options) { o.ctx = ctx }
}
// WithRootPath with registry root path.
func WithRootPath(path string) Option {
return func(o *options) { o.rootPath = path }
}
// WithTimeout with registry timeout.
func WithTimeout(timeout time.Duration) Option {
return func(o *options) { o.timeout = timeout }
return func(o *options) { o.namespace = path }
}
// WithDigestACL with registry password.
@ -56,34 +41,21 @@ func WithDigestACL(user string, password string) Option {
type Registry struct {
opts *options
conn *zk.Conn
lock sync.Mutex
registry map[string]*serviceSet
group singleflight.Group
}
func New(zkServers []string, opts ...Option) (*Registry, error) {
func New(conn *zk.Conn, opts ...Option) *Registry {
options := &options{
ctx: context.Background(),
rootPath: "/microservices",
timeout: time.Second * 5,
namespace: "/microservices",
}
for _, o := range opts {
o(options)
}
conn, _, err := zk.Connect(zkServers, options.timeout)
if err != nil {
return nil, err
}
if len(options.user) > 0 && len(options.password) > 0 {
err = conn.AddAuth("digest", []byte(options.user+":"+options.password))
if err != nil {
return nil, err
}
}
return &Registry{
opts: options,
conn: conn,
registry: make(map[string]*serviceSet),
}, err
}
}
func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error {
@ -91,14 +63,14 @@ func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstan
data []byte
err error
)
if err = r.ensureName(r.opts.rootPath, []byte(""), 0); err != nil {
if err = r.ensureName(r.opts.namespace, []byte(""), 0); err != nil {
return err
}
serviceNamePath := path.Join(r.opts.rootPath, service.Name)
serviceNamePath := path.Join(r.opts.namespace, service.Name)
if err = r.ensureName(serviceNamePath, []byte(""), 0); err != nil {
return err
}
if data, err = json.Marshal(service); err != nil {
if data, err = marshal(service); err != nil {
return err
}
servicePath := path.Join(serviceNamePath, service.ID)
@ -111,7 +83,7 @@ func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstan
// Deregister registry service to zookeeper.
func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
ch := make(chan error, 1)
servicePath := path.Join(r.opts.rootPath, service.Name, service.ID)
servicePath := path.Join(r.opts.namespace, service.Name, service.ID)
go func() {
err := r.conn.Delete(servicePath, -1)
ch <- err
@ -127,68 +99,36 @@ func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInst
// GetService get services from zookeeper
func (r *Registry) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
serviceNamePath := path.Join(r.opts.rootPath, serviceName)
instances, err, _ := r.group.Do(serviceName, func() (interface{}, error) {
serviceNamePath := path.Join(r.opts.namespace, serviceName)
servicesID, _, err := r.conn.Children(serviceNamePath)
if err != nil {
return nil, err
}
items := make([]*registry.ServiceInstance, 0, len(servicesID))
for _, service := range servicesID {
item := &registry.ServiceInstance{}
servicePath := path.Join(serviceNamePath, service)
serviceInstanceByte, _, err := r.conn.Get(servicePath)
if err != nil {
return nil, err
}
if err := json.Unmarshal(serviceInstanceByte, item); err != nil {
item, err := unmarshal(serviceInstanceByte)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, nil
})
if err != nil {
return nil, err
}
return instances.([]*registry.ServiceInstance), nil
}
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
r.lock.Lock()
defer r.lock.Unlock()
set, ok := r.registry[serviceName]
if !ok {
set = &serviceSet{
watcher: make(map[*watcher]struct{}),
services: &atomic.Value{},
serviceName: serviceName,
}
r.registry[serviceName] = set
}
// 初始化watcher
w := &watcher{
event: make(chan struct{}, 1),
}
w.ctx, w.cancel = context.WithCancel(context.Background())
w.set = set
set.lock.Lock()
set.watcher[w] = struct{}{}
set.lock.Unlock()
ss, _ := set.services.Load().([]*registry.ServiceInstance)
if len(ss) > 0 {
// 如果services有值需要推送给watcher,否则watch的时候可能会永远阻塞拿不到初始的数据
w.event <- struct{}{}
}
// 放在最后是为了防止漏推送
if !ok {
go r.resolve(set)
}
return w, nil
}
func (r *Registry) resolve(ss *serviceSet) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
services, err := r.GetService(ctx, ss.serviceName)
cancel()
if err == nil && len(services) > 0 {
ss.broadcast(services)
}
prefix := path.Join(r.opts.namespace, serviceName)
return newWatcher(ctx, prefix, serviceName, r.conn)
}
// ensureName ensure node exists, if not exist, create and set data

@ -6,6 +6,7 @@ import (
"time"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-zookeeper/zk"
)
func TestRegistry(t *testing.T) {
@ -16,11 +17,23 @@ func TestRegistry(t *testing.T) {
Endpoints: []string{"http://127.0.0.1:1111"},
}
r, _ := New([]string{"127.0.0.1:2181"}, WithDigestACL("username", "password"))
conn, _, err := zk.Connect([]string{"127.0.0.1:2181"}, time.Second*15)
if err != nil {
t.Fatal(err)
return
}
r := New(conn)
err = r.Register(ctx, s)
if err != nil {
t.Fatal(err)
return
}
time.Sleep(time.Second)
w, err := r.Watch(ctx, s.Name)
if err != nil {
t.Fatal(err)
return
}
defer func() {
_ = w.Stop()
@ -29,6 +42,7 @@ func TestRegistry(t *testing.T) {
for {
res, nextErr := w.Next()
if nextErr != nil {
t.Errorf("watch next error: %s", nextErr)
return
}
t.Logf("watch: %d", len(res))

@ -1,27 +1,16 @@
package zookeeper
import (
"sync"
"sync/atomic"
"encoding/json"
"github.com/go-kratos/kratos/v2/registry"
)
type serviceSet struct {
serviceName string
watcher map[*watcher]struct{}
services *atomic.Value
lock sync.RWMutex
func marshal(si *registry.ServiceInstance) ([]byte, error) {
return json.Marshal(si)
}
func (s *serviceSet) broadcast(ss []*registry.ServiceInstance) {
s.services.Store(ss)
s.lock.RLock()
defer s.lock.RUnlock()
for k := range s.watcher {
select {
case k.event <- struct{}{}:
default:
}
}
func unmarshal(data []byte) (si *registry.ServiceInstance, err error) {
err = json.Unmarshal(data, &si)
return
}

@ -2,36 +2,101 @@ package zookeeper
import (
"context"
"errors"
"path"
"sync/atomic"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-zookeeper/zk"
)
var _ registry.Watcher = &watcher{}
var ErrWatcherStopped = errors.New("watcher stopped")
type watcher struct {
ctx context.Context
event chan zk.Event
conn *zk.Conn
cancel context.CancelFunc
event chan struct{}
set *serviceSet
first uint32
// 前缀
prefix string
// watch 的服务名
serviceName string
}
func newWatcher(ctx context.Context, prefix, serviceName string, conn *zk.Conn) (*watcher, error) {
w := &watcher{conn: conn, event: make(chan zk.Event, 1), prefix: prefix, serviceName: serviceName}
w.ctx, w.cancel = context.WithCancel(ctx)
go w.watch(w.ctx)
return w, nil
}
func (w *watcher) watch(ctx context.Context) {
for {
// 每次 watch 只有一次有效期 所以循环 watch
_, _, ch, err := w.conn.ChildrenW(w.prefix)
if err != nil {
w.event <- zk.Event{Err: err}
}
select {
case <-ctx.Done():
return
default:
w.event <- <-ch
}
}
}
func (w watcher) Next() (services []*registry.ServiceInstance, err error) {
func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
// todo 如果多处调用 next 可能会导致多实例信息不同步
if atomic.CompareAndSwapUint32(&w.first, 0, 1) {
return w.getServices()
}
select {
case <-w.ctx.Done():
err = w.ctx.Err()
case <-w.event:
return nil, w.ctx.Err()
case e := <-w.event:
if e.State == zk.StateDisconnected {
return nil, ErrWatcherStopped
}
ss, ok := w.set.services.Load().([]*registry.ServiceInstance)
if ok {
services = append(services, ss...)
if e.Err != nil {
return nil, e.Err
}
return w.getServices()
}
return
}
func (w *watcher) Stop() error {
w.cancel()
w.set.lock.Lock()
defer w.set.lock.Unlock()
delete(w.set.watcher, w)
return nil
}
func (w *watcher) getServices() ([]*registry.ServiceInstance, error) {
servicesID, _, err := w.conn.Children(w.prefix)
if err != nil {
return nil, err
}
items := make([]*registry.ServiceInstance, 0, len(servicesID))
for _, id := range servicesID {
servicePath := path.Join(w.prefix, id)
b, _, err := w.conn.Get(servicePath)
if err != nil {
return nil, err
}
item, err := unmarshal(b)
if err != nil {
return nil, err
}
// 与 watch 的服务名不同 则跳过
if item.Name != w.serviceName {
continue
}
items = append(items, item)
}
return items, nil
}

Loading…
Cancel
Save