From fa750fdff759af04d90ab5864efbd33764270985 Mon Sep 17 00:00:00 2001 From: guobin <596194783@qq.com> Date: Mon, 21 Oct 2019 11:08:25 +0800 Subject: [PATCH 01/18] =?UTF-8?q?zookeeper=E6=8E=A5=E5=85=A5kratos=20=20?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E6=B3=A8=E5=86=8C=E5=8F=91=E7=8E=B0=E4=B8=8E?= =?UTF-8?q?=E8=B4=9F=E8=BD=BD=E5=9D=87=E8=A1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + go.sum | 2 + pkg/naming/zookeeper/zookeeper.go | 397 ++++++++++++++++++++++++++++++ 3 files changed, 400 insertions(+) create mode 100644 pkg/naming/zookeeper/zookeeper.go diff --git a/go.mod b/go.mod index ce078a8e7..67d49efd3 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect github.com/go-sql-driver/mysql v1.4.1 + github.com/go-zookeeper/zk v1.0.1 // indirect github.com/gogo/protobuf v1.3.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 // indirect diff --git a/go.sum b/go.sum index 1f489a404..65b167ca5 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEK github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-zookeeper/zk v1.0.1 h1:LmXNmSnkNsNKai+aDu6sHRr8ZJzIrHJo8z8Z4sm8cT8= +github.com/go-zookeeper/zk v1.0.1/go.mod h1:gpJdHazfkmlg4V0rt0vYeHYJHSL8hHFwV0qOd+HRTJE= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go new file mode 100644 index 000000000..778e8a5a3 --- /dev/null +++ b/pkg/naming/zookeeper/zookeeper.go @@ -0,0 +1,397 @@ +package zookeeper + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/bilibili/kratos/pkg/log" + "github.com/bilibili/kratos/pkg/naming" + "github.com/go-zookeeper/zk" +) + +const ( + RootPath = "/" +) + +type Config struct { + // Endpoints is a list of URLs. + Endpoints []string `json:"endpoints"` +} + +var ( + _once sync.Once + _builder naming.Builder + + //ErrDuplication is a register duplication err + ErrDuplication = errors.New("zookeeper: instance duplicate registration") +) + +// Builder return default zookeeper resolver builder. +func Builder(c *Config) naming.Builder { + _once.Do(func() { + _builder, _ = New(c) + }) + return _builder +} + +// Build register resolver into default zookeeper. +func Build(c *Config, id string) naming.Resolver { + return Builder(c).Build(id) +} + +// ZookeeperBuilder is a zookeeper client Builder +type ZookeeperBuilder struct { + cli *zk.Conn + connEvent <-chan zk.Event + ctx context.Context + cancelFunc context.CancelFunc + + mutex sync.RWMutex + apps map[string]*appInfo + registry map[string]struct{} +} + +type appInfo struct { + resolver map[*Resolve]struct{} + ins atomic.Value + zkb *ZookeeperBuilder + once sync.Once +} + +// Resolve zookeeper resolver. +type Resolve struct { + id string + event chan struct{} + zkb *ZookeeperBuilder +} + +// New is new a zookeeper builder +func New(c *Config) (zkb *ZookeeperBuilder, err error) { + //example: endpointSli = []string{"192.168.1.78:2181", "192.168.1.79:2181", "192.168.1.80:2181"} + if len(c.Endpoints) == 0 { + errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null") + log.Error(errInfo) + return nil, errors.New(errInfo) + } + + zkConn, connEvent, err := zk.Connect(c.Endpoints, 5*time.Second) + if err != nil { + log.Error(fmt.Sprintf("zk Connect err:(%v)", err)) + return + } else { + log.Info(fmt.Sprintf("zk Connect ok!")) + } + + ctx, cancel := context.WithCancel(context.Background()) + zkb = &ZookeeperBuilder{ + cli: zkConn, + connEvent: connEvent, + ctx: ctx, + cancelFunc: cancel, + apps: map[string]*appInfo{}, + registry: map[string]struct{}{}, + } + return +} + +// Build zookeeper resovler builder. +func (z *ZookeeperBuilder) Build(appid string) naming.Resolver { + r := &Resolve{ + id: appid, + zkb: z, + event: make(chan struct{}, 1), + } + z.mutex.Lock() + app, ok := z.apps[appid] + if !ok { + app = &appInfo{ + resolver: make(map[*Resolve]struct{}), + zkb: z, + } + z.apps[appid] = app + } + app.resolver[r] = struct{}{} + z.mutex.Unlock() + if ok { + select { + case r.event <- struct{}{}: + default: + } + } + + app.once.Do(func() { + go app.watch(appid) + log.Info("zookeeper: AddWatch(%s) already watch(%v)", appid, ok) + }) + return r +} + +// Scheme return zookeeper's scheme +func (z *ZookeeperBuilder) Scheme() string { + return "zookeeper" +} + +// Register is register instance +func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { + z.mutex.Lock() + if _, ok := z.registry[ins.AppID]; ok { + err = ErrDuplication + } else { + z.registry[ins.AppID] = struct{}{} + } + z.mutex.Unlock() + if err != nil { + return + } + ctx, cancel := context.WithCancel(z.ctx) + if err = z.register(ctx, ins); err != nil { + z.mutex.Lock() + delete(z.registry, ins.AppID) + z.mutex.Unlock() + cancel() + return + } + ch := make(chan struct{}, 1) + cancelFunc = context.CancelFunc(func() { + cancel() + <-ch + }) + + go func() { + for { + select { + case connEvent := <-z.connEvent: + log.Warn("watch zkClient state, connEvent:(%v)", connEvent) + if connEvent.State == zk.StateHasSession { + log.Warn("watch zkClient state, state is StateHasSession...") + err = z.register(ctx, ins) + if err != nil { + log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err)) + continue + } + } + case <-time.After(time.Second): + } + } + }() + return +} + +func (z *ZookeeperBuilder) registerPerServer(name string) (err error) { + var ( + str string + ) + + str, err = z.cli.Create(name, nil, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("registerPerServer, fail to Create node:(%s). err:(%v)", name, err)) + } else { + log.Info(fmt.Sprintf("registerPerServer, succeed to Create node:(%s). retStr:(%s)", name, str)) + } + + return +} + +func (z *ZookeeperBuilder) registerEphServer(name, host string, ins *naming.Instance) (err error) { + var ( + str string + ) + + val, _ := json.Marshal(ins) + log.Info(fmt.Sprintf("registerEphServer, ins after json.Marshal:(%v)", string(val))) + + str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("registerEphServer, fail to Create node:%s. err:(%v)", name+host, err)) + } else { + log.Info(fmt.Sprintf("registerEphServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) + } + + return +} + +// register 注册zookeeper节点 +func (z *ZookeeperBuilder) register(ctx context.Context, ins *naming.Instance) (err error) { + log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs) + prefix := z.keyPrefix(ins) + + err = z.registerPerServer(prefix) + if err != nil { + log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err)) + } + + var svrAddr string + for _, val := range ins.Addrs { + if strings.HasPrefix(val, "grpc://") { + svrAddr = strings.TrimPrefix(val, "grpc://") + break + } + } + if svrAddr == "" { + errInfo := fmt.Sprintf("register, error occur, grpc svrAddr is null") + log.Error(errInfo) + return errors.New(errInfo) + } + + err = z.registerEphServer(prefix, RootPath+svrAddr, ins) + if err != nil { + log.Error(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) + //return + } else { + log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) + } + + return nil +} + +// unregister 删除zookeeper中节点信息 +func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) { + log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) + prefix := z.keyPrefix(ins) + + var svrAddr string + for _, val := range ins.Addrs { + if strings.HasPrefix(val, "grpc://") { + svrAddr = strings.TrimPrefix(val, "grpc://") + break + } + } + if svrAddr == "" { + errInfo := fmt.Sprintf("unregister, error occur, grpc svrAddr is null") + log.Error(errInfo) + return errors.New(errInfo) + } + + strNode := prefix + RootPath + svrAddr + exists, _, err := z.cli.Exists(strNode) + if err != nil { + log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) + return err + } + if exists { + _, s, err := z.cli.Get(strNode) + if err != nil { + log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) + return err + } + return z.cli.Delete(strNode, s.Version) + } + + log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) + return +} + +func (z *ZookeeperBuilder) keyPrefix(ins *naming.Instance) string { + return fmt.Sprintf("/%s", ins.AppID) +} + +// Close stop all running process including zk fetch and register +func (z *ZookeeperBuilder) Close() error { + z.cancelFunc() + return nil +} + +func (a *appInfo) watch(appID string) { + _ = a.fetchstore(appID) + prefix := fmt.Sprintf("/%s", appID) + + go func() { + for { + log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) + snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) + if err != nil { + continue + } + + log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot)) + for ev := range event { + log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) + if ev.Type == zk.EventNodeChildrenChanged { + _ = a.fetchstore(appID) + } + } + } + }() +} + +func (a *appInfo) fetchstore(appID string) (err error) { + prefix := fmt.Sprintf("/%s", appID) + strNode := "" + childs, _, err := a.zkb.cli.Children(prefix) + if err != nil { + log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), err:(%v)", prefix, err)) + } else { + log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) + } + + ins := &naming.InstancesInfo{ + Instances: make(map[string][]*naming.Instance, 0), + } + + //for range childs + for _, child := range childs { + strNode = prefix + RootPath + child + resp, _, err := a.zkb.cli.Get(strNode) + if err != nil { + log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err) + return err + } + + in := new(naming.Instance) + err = json.Unmarshal(resp, in) + if err != nil { + return err + } + ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in) + + } + a.store(ins) + + return nil +} + +func (a *appInfo) store(ins *naming.InstancesInfo) { + + a.ins.Store(ins) + a.zkb.mutex.RLock() + for rs := range a.resolver { + select { + case rs.event <- struct{}{}: + default: + } + } + a.zkb.mutex.RUnlock() +} + +// Watch watch instance. +func (r *Resolve) Watch() <-chan struct{} { + return r.event +} + +// Fetch fetch resolver instance. +func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) { + r.zkb.mutex.RLock() + app, ok := r.zkb.apps[r.id] + r.zkb.mutex.RUnlock() + if ok { + ins, ok = app.ins.Load().(*naming.InstancesInfo) + return + } + return +} + +// Close close resolver. +func (r *Resolve) Close() error { + r.zkb.mutex.Lock() + if app, ok := r.zkb.apps[r.id]; ok && len(app.resolver) != 0 { + delete(app.resolver, r) + } + r.zkb.mutex.Unlock() + return nil +} From d9f57d5647f7f463317329ee9bc8e547a313fe48 Mon Sep 17 00:00:00 2001 From: guobin <596194783@qq.com> Date: Wed, 23 Oct 2019 15:50:06 +0800 Subject: [PATCH 02/18] =?UTF-8?q?zookeeper=E6=8E=A5=E5=85=A5kratos=20=20?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E6=B3=A8=E5=86=8C=E5=8F=91=E7=8E=B0=E4=B8=8E?= =?UTF-8?q?=E8=B4=9F=E8=BD=BD=E5=9D=87=E8=A1=A1=201.=20=E5=8A=A0=E5=85=A5c?= =?UTF-8?q?tx.Done=E7=9A=84=E6=83=85=E5=86=B5=E9=98=B2=E6=AD=A2groutine=20?= =?UTF-8?q?=E6=B3=84=E9=9C=B2=202.=20=E6=B3=A8=E5=86=8C=E7=9A=84=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE=EF=BC=8C=E5=85=81=E8=AE=B8=E6=B3=A8=E5=86=8C=E5=A4=9A?= =?UTF-8?q?=E7=A7=8D=E5=8D=8F=E8=AE=AE=EF=BC=8C=E5=85=B7=E4=BD=93=E9=80=89?= =?UTF-8?q?=E6=8B=A9=E5=93=AA=E7=A7=8D=E5=8D=8F=E8=AE=AE=E6=9C=89client?= =?UTF-8?q?=E7=AB=AF=E9=80=89=E6=8B=A9=203.=20=E5=8E=BB=E6=8E=89RootPath?= =?UTF-8?q?=E5=B8=B8=E9=87=8F=EF=BC=8C=E9=81=BF=E5=85=8D=E6=AD=A7=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/naming/zookeeper/zookeeper.go | 69 ++++++++++--------------------- 1 file changed, 22 insertions(+), 47 deletions(-) diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go index 778e8a5a3..9e50f130f 100644 --- a/pkg/naming/zookeeper/zookeeper.go +++ b/pkg/naming/zookeeper/zookeeper.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "strings" "sync" "sync/atomic" "time" @@ -15,10 +14,6 @@ import ( "github.com/go-zookeeper/zk" ) -const ( - RootPath = "/" -) - type Config struct { // Endpoints is a list of URLs. Endpoints []string `json:"endpoints"` @@ -176,7 +171,9 @@ func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) ( continue } } - case <-time.After(time.Second): + case <-ctx.Done(): + ch <- struct{}{} + return } } }() @@ -226,26 +223,14 @@ func (z *ZookeeperBuilder) register(ctx context.Context, ins *naming.Instance) ( log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err)) } - var svrAddr string for _, val := range ins.Addrs { - if strings.HasPrefix(val, "grpc://") { - svrAddr = strings.TrimPrefix(val, "grpc://") - break + err = z.registerEphServer(prefix, "/"+val, ins) + if err != nil { + log.Warn(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) + } else { + log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) } } - if svrAddr == "" { - errInfo := fmt.Sprintf("register, error occur, grpc svrAddr is null") - log.Error(errInfo) - return errors.New(errInfo) - } - - err = z.registerEphServer(prefix, RootPath+svrAddr, ins) - if err != nil { - log.Error(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) - //return - } else { - log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) - } return nil } @@ -255,35 +240,25 @@ func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) { log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) prefix := z.keyPrefix(ins) - var svrAddr string for _, val := range ins.Addrs { - if strings.HasPrefix(val, "grpc://") { - svrAddr = strings.TrimPrefix(val, "grpc://") - break - } - } - if svrAddr == "" { - errInfo := fmt.Sprintf("unregister, error occur, grpc svrAddr is null") - log.Error(errInfo) - return errors.New(errInfo) - } - - strNode := prefix + RootPath + svrAddr - exists, _, err := z.cli.Exists(strNode) - if err != nil { - log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) - return err - } - if exists { - _, s, err := z.cli.Get(strNode) + strNode := prefix + "/" + val + exists, _, err := z.cli.Exists(strNode) if err != nil { - log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) + log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) return err } - return z.cli.Delete(strNode, s.Version) + if exists { + _, s, err := z.cli.Get(strNode) + if err != nil { + log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) + return err + } + return z.cli.Delete(strNode, s.Version) + } + + log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) } - log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) return } @@ -336,7 +311,7 @@ func (a *appInfo) fetchstore(appID string) (err error) { //for range childs for _, child := range childs { - strNode = prefix + RootPath + child + strNode = prefix + "/" + child resp, _, err := a.zkb.cli.Get(strNode) if err != nil { log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err) From 56e06c61a489f5a9367e080daa6462521767a664 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 13:48:48 +0800 Subject: [PATCH 03/18] add zk test --- .travis.yml | 7 + go.mod | 2 +- pkg/naming/zookeeper/zookeeper.go | 197 +++++++++++++------------ pkg/naming/zookeeper/zookeeper_test.go | 50 +++++++ 4 files changed, 160 insertions(+), 96 deletions(-) create mode 100644 pkg/naming/zookeeper/zookeeper_test.go diff --git a/.travis.yml b/.travis.yml index b9bfc6413..fb8840993 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,10 +23,13 @@ env: - DOCKER_COMPOSE_VERSION=1.24.1 before_install: + # docker-compose - sudo rm /usr/local/bin/docker-compose - curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin + # zookeeper + - "sudo apt-get install -y zookeeper-server" # Skip the install step. Don't `go get` dependencies. Only build with the code # in vendor/ @@ -37,9 +40,13 @@ install: true # Make sure golangci-lint is vendored. before_script: - curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $GOPATH/bin + # discovery - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/install.sh | sh -s -- -b $GOPATH/bin - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" + # zookeeper + - "sudo service zookeeper-server init" + - "sudo service zookeeper-server start" script: - go build ./... diff --git a/go.mod b/go.mod index 6efa655a2..8a81255ea 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect github.com/go-sql-driver/mysql v1.4.1 - github.com/go-zookeeper/zk v1.0.1 // indirect + github.com/go-zookeeper/zk v1.0.1 github.com/gogo/protobuf v1.3.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 // indirect diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go index 9e50f130f..92662a058 100644 --- a/pkg/naming/zookeeper/zookeeper.go +++ b/pkg/naming/zookeeper/zookeeper.go @@ -5,25 +5,30 @@ import ( "encoding/json" "errors" "fmt" + "path" + "strings" "sync" "sync/atomic" "time" "github.com/bilibili/kratos/pkg/log" "github.com/bilibili/kratos/pkg/naming" + xtime "github.com/bilibili/kratos/pkg/time" "github.com/go-zookeeper/zk" ) +// Config is zookeeper config. type Config struct { - // Endpoints is a list of URLs. - Endpoints []string `json:"endpoints"` + Root string `json:"root"` + Endpoints []string `json:"endpoints"` + Timeout xtime.Duration `json:"timeout"` } var ( _once sync.Once _builder naming.Builder - //ErrDuplication is a register duplication err + // ErrDuplication is a register duplication err ErrDuplication = errors.New("zookeeper: instance duplicate registration") ) @@ -40,22 +45,10 @@ func Build(c *Config, id string) naming.Resolver { return Builder(c).Build(id) } -// ZookeeperBuilder is a zookeeper client Builder -type ZookeeperBuilder struct { - cli *zk.Conn - connEvent <-chan zk.Event - ctx context.Context - cancelFunc context.CancelFunc - - mutex sync.RWMutex - apps map[string]*appInfo - registry map[string]struct{} -} - type appInfo struct { resolver map[*Resolve]struct{} ins atomic.Value - zkb *ZookeeperBuilder + zkb *Zookeeper once sync.Once } @@ -63,28 +56,43 @@ type appInfo struct { type Resolve struct { id string event chan struct{} - zkb *ZookeeperBuilder + zkb *Zookeeper +} + +// Zookeeper is a zookeeper client Builder. +type Zookeeper struct { + c *Config + cli *zk.Conn + connEvent <-chan zk.Event + ctx context.Context + cancelFunc context.CancelFunc + + mutex sync.RWMutex + apps map[string]*appInfo + registry map[string]struct{} } -// New is new a zookeeper builder -func New(c *Config) (zkb *ZookeeperBuilder, err error) { - //example: endpointSli = []string{"192.168.1.78:2181", "192.168.1.79:2181", "192.168.1.80:2181"} +// New is new a zookeeper builder. +func New(c *Config) (zkb *Zookeeper, err error) { + if c.Timeout == 0 { + c.Timeout = xtime.Duration(time.Second) + } if len(c.Endpoints) == 0 { errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null") log.Error(errInfo) return nil, errors.New(errInfo) } - zkConn, connEvent, err := zk.Connect(c.Endpoints, 5*time.Second) + zkConn, connEvent, err := zk.Connect(c.Endpoints, time.Duration(c.Timeout)) if err != nil { log.Error(fmt.Sprintf("zk Connect err:(%v)", err)) return - } else { - log.Info(fmt.Sprintf("zk Connect ok!")) } + log.Info(fmt.Sprintf("zk Connect ok!")) ctx, cancel := context.WithCancel(context.Background()) - zkb = &ZookeeperBuilder{ + zkb = &Zookeeper{ + c: c, cli: zkConn, connEvent: connEvent, ctx: ctx, @@ -96,7 +104,7 @@ func New(c *Config) (zkb *ZookeeperBuilder, err error) { } // Build zookeeper resovler builder. -func (z *ZookeeperBuilder) Build(appid string) naming.Resolver { +func (z *Zookeeper) Build(appid string) naming.Resolver { r := &Resolve{ id: appid, zkb: z, @@ -119,21 +127,19 @@ func (z *ZookeeperBuilder) Build(appid string) naming.Resolver { default: } } - app.once.Do(func() { go app.watch(appid) - log.Info("zookeeper: AddWatch(%s) already watch(%v)", appid, ok) }) return r } -// Scheme return zookeeper's scheme -func (z *ZookeeperBuilder) Scheme() string { +// Scheme return zookeeper's scheme. +func (z *Zookeeper) Scheme() string { return "zookeeper" } -// Register is register instance -func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { +// Register is register instance. +func (z *Zookeeper) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { z.mutex.Lock() if _, ok := z.registry[ins.AppID]; ok { err = ErrDuplication @@ -157,16 +163,13 @@ func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) ( cancel() <-ch }) - go func() { for { select { case connEvent := <-z.connEvent: - log.Warn("watch zkClient state, connEvent:(%v)", connEvent) + log.Info("watch zkClient state, connEvent:(%+v)", connEvent) if connEvent.State == zk.StateHasSession { - log.Warn("watch zkClient state, state is StateHasSession...") - err = z.register(ctx, ins) - if err != nil { + if err = z.register(ctx, ins); err != nil { log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err)) continue } @@ -180,110 +183,120 @@ func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) ( return } -func (z *ZookeeperBuilder) registerPerServer(name string) (err error) { +func (z *Zookeeper) createPath(paths string) error { var ( - str string + lastPath = "/" + seps = strings.Split(paths, "/") ) - - str, err = z.cli.Create(name, nil, 0, zk.WorldACL(zk.PermAll)) - if err != nil { - log.Warn(fmt.Sprintf("registerPerServer, fail to Create node:(%s). err:(%v)", name, err)) - } else { - log.Info(fmt.Sprintf("registerPerServer, succeed to Create node:(%s). retStr:(%s)", name, str)) + for _, part := range seps { + if part == "" { + continue + } + lastPath = path.Join(lastPath, part) + ok, _, err := z.cli.Exists(lastPath) + if err != nil { + return err + } + if ok { + continue + } + ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err)) + return err + } + log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) } - - return + return nil } -func (z *ZookeeperBuilder) registerEphServer(name, host string, ins *naming.Instance) (err error) { +func (z *Zookeeper) registerPeerServer(name, host string, ins *naming.Instance) (err error) { var ( str string ) - - val, _ := json.Marshal(ins) - log.Info(fmt.Sprintf("registerEphServer, ins after json.Marshal:(%v)", string(val))) - + val, err := json.Marshal(ins) + if err != nil { + return + } + log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val))) str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) if err != nil { - log.Warn(fmt.Sprintf("registerEphServer, fail to Create node:%s. err:(%v)", name+host, err)) + log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", name+host, err)) } else { - log.Info(fmt.Sprintf("registerEphServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) + log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) } - return } -// register 注册zookeeper节点 -func (z *ZookeeperBuilder) register(ctx context.Context, ins *naming.Instance) (err error) { +// register is register instance to zookeeper. +func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err error) { log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs) - prefix := z.keyPrefix(ins) - err = z.registerPerServer(prefix) - if err != nil { - log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err)) + prefix := z.keyPrefix(ins.AppID) + if err = z.createPath(prefix); err != nil { + log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err)) } - - for _, val := range ins.Addrs { - err = z.registerEphServer(prefix, "/"+val, ins) - if err != nil { - log.Warn(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) + for _, addr := range ins.Addrs { + addr = strings.Replace(addr, "://", ":", 1) + if err = z.registerPeerServer(prefix, "/"+addr, ins); err != nil { + log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) } else { log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) } } - return nil } -// unregister 删除zookeeper中节点信息 -func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) { +func (z *Zookeeper) unregister(ins *naming.Instance) (err error) { log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) - prefix := z.keyPrefix(ins) - - for _, val := range ins.Addrs { - strNode := prefix + "/" + val + prefix := z.keyPrefix(ins.AppID) + for _, addr := range ins.Addrs { + addr = strings.Replace(addr, ":", "://", 1) + strNode := prefix + "/" + addr exists, _, err := z.cli.Exists(strNode) if err != nil { - log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) - return err + log.Error("zk.Conn.Exists node:(%v), error:(%v)", strNode, err) + continue } if exists { _, s, err := z.cli.Get(strNode) if err != nil { - log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) - return err + log.Error("zk.Conn.Get node:(%s), error:(%v)", strNode, err) + continue + } + if err = z.cli.Delete(strNode, s.Version); err != nil { + log.Error("zk.Conn.Delete node:(%s), error:(%v)", strNode, err) + continue } - return z.cli.Delete(strNode, s.Version) } log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) } - return } -func (z *ZookeeperBuilder) keyPrefix(ins *naming.Instance) string { - return fmt.Sprintf("/%s", ins.AppID) +func (z *Zookeeper) keyPrefix(appID string) string { + return path.Join(z.c.Root, appID) } -// Close stop all running process including zk fetch and register -func (z *ZookeeperBuilder) Close() error { +// Close stop all running process including zk fetch and register. +func (z *Zookeeper) Close() error { z.cancelFunc() return nil } func (a *appInfo) watch(appID string) { _ = a.fetchstore(appID) - prefix := fmt.Sprintf("/%s", appID) - + prefix := a.zkb.keyPrefix(appID) go func() { for { log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) if err != nil { + log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err) + time.Sleep(time.Second) continue } - log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot)) for ev := range event { log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) @@ -296,28 +309,24 @@ func (a *appInfo) watch(appID string) { } func (a *appInfo) fetchstore(appID string) (err error) { - prefix := fmt.Sprintf("/%s", appID) - strNode := "" + prefix := a.zkb.keyPrefix(appID) childs, _, err := a.zkb.cli.Children(prefix) if err != nil { - log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), err:(%v)", prefix, err)) + log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) } else { log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) } - ins := &naming.InstancesInfo{ Instances: make(map[string][]*naming.Instance, 0), } - - //for range childs + strNode := "" for _, child := range childs { strNode = prefix + "/" + child resp, _, err := a.zkb.cli.Get(strNode) if err != nil { - log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err) + log.Error("zookeeper: fetch client.Get(%s) error:(%v)", strNode, err) return err } - in := new(naming.Instance) err = json.Unmarshal(resp, in) if err != nil { @@ -327,12 +336,10 @@ func (a *appInfo) fetchstore(appID string) (err error) { } a.store(ins) - return nil } func (a *appInfo) store(ins *naming.InstancesInfo) { - a.ins.Store(ins) a.zkb.mutex.RLock() for rs := range a.resolver { diff --git a/pkg/naming/zookeeper/zookeeper_test.go b/pkg/naming/zookeeper/zookeeper_test.go new file mode 100644 index 000000000..d958527af --- /dev/null +++ b/pkg/naming/zookeeper/zookeeper_test.go @@ -0,0 +1,50 @@ +package zookeeper + +import ( + "context" + "testing" + "time" + + "github.com/bilibili/kratos/pkg/naming" +) + +var ( + _testAppid = "test_appid" + + _testConf = &Config{ + Root: "/test", + Endpoints: []string{"127.0.0.1:2181"}, + } + _testIns = &naming.Instance{ + AppID: _testAppid, + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{ + "test_key": "test_value", + }, + } +) + +func TestZookeeper(t *testing.T) { + zk, err := New(_testConf) + if err != nil { + t.Fatal(err) + } + res := zk.Build(_testAppid) + go func() { + for event := range res.Watch() { + t.Log(event) + } + }() + _, err = zk.Register(context.TODO(), _testIns) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + in, ok := res.Fetch(context.TODO()) + if !ok { + t.Fatal("failed to resolver fetch") + } + if len(in.Instances) != 1 { + t.Fatalf("Instances not match, got:%d want:1", len(in.Instances)) + } +} From a1ebd91257066d017e219b1a49db593daed4c0a4 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 13:59:20 +0800 Subject: [PATCH 04/18] fix zk download --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index fb8840993..c0d6dd93d 100644 --- a/.travis.yml +++ b/.travis.yml @@ -29,7 +29,8 @@ before_install: - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin # zookeeper - - "sudo apt-get install -y zookeeper-server" + - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/zookeeper-3.5.4-beta.tar.gz" + - tar -xvf "zookeeper-3.5.4-beta.tar.gz" # Skip the install step. Don't `go get` dependencies. Only build with the code # in vendor/ @@ -45,8 +46,7 @@ before_script: - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" # zookeeper - - "sudo service zookeeper-server init" - - "sudo service zookeeper-server start" + - ./zookeeper-3.5.4-beta/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null script: - go build ./... From 15fdcfa0851198fc8b951f574d66e9488ffb557b Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 14:02:46 +0800 Subject: [PATCH 05/18] fix zk version --- .travis.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index c0d6dd93d..63de691cf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,7 @@ env: - DISCOVERY_NODES=127.0.0.1:7171 - HTTP_PERF=tcp://0.0.0.0:0 - DOCKER_COMPOSE_VERSION=1.24.1 + - ZK_VERSION=3.5.4-beta before_install: # docker-compose @@ -30,7 +31,7 @@ before_install: - sudo mv docker-compose /usr/local/bin # zookeeper - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/zookeeper-3.5.4-beta.tar.gz" - - tar -xvf "zookeeper-3.5.4-beta.tar.gz" + - tar -xvf "zookeeper-${ZK_VERSION}.tar.gz" # Skip the install step. Don't `go get` dependencies. Only build with the code # in vendor/ @@ -46,7 +47,7 @@ before_script: - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" # zookeeper - - ./zookeeper-3.5.4-beta/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null + - ./zookeeper-${ZK_VERSION}/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null script: - go build ./... From 6f438212c72110757069e0380ef110f737b073aa Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 14:08:19 +0800 Subject: [PATCH 06/18] fix zk version --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 63de691cf..75d1e54f5 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,7 +21,7 @@ env: - DISCOVERY_NODES=127.0.0.1:7171 - HTTP_PERF=tcp://0.0.0.0:0 - DOCKER_COMPOSE_VERSION=1.24.1 - - ZK_VERSION=3.5.4-beta + - ZK_VERSION=3.5.5 before_install: # docker-compose From 6e09ca9b6421c40f9e6b62e208ca2733b202b9f5 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 14:11:41 +0800 Subject: [PATCH 07/18] fix zk version --- .travis.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.travis.yml b/.travis.yml index 75d1e54f5..31533d606 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,7 +21,7 @@ env: - DISCOVERY_NODES=127.0.0.1:7171 - HTTP_PERF=tcp://0.0.0.0:0 - DOCKER_COMPOSE_VERSION=1.24.1 - - ZK_VERSION=3.5.5 + - ZK_VERSION=3.5.6 before_install: # docker-compose @@ -30,7 +30,7 @@ before_install: - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin # zookeeper - - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/zookeeper-3.5.4-beta.tar.gz" + - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/zookeeper-${ZK_VERSION}.tar.gz" - tar -xvf "zookeeper-${ZK_VERSION}.tar.gz" # Skip the install step. Don't `go get` dependencies. Only build with the code From d70dc25bd2debda0cfbf0cfb222d4fb58cbdd57c Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 14:14:32 +0800 Subject: [PATCH 08/18] fix zk name --- .travis.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 31533d606..342a9adbf 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,8 +30,8 @@ before_install: - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin # zookeeper - - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/zookeeper-${ZK_VERSION}.tar.gz" - - tar -xvf "zookeeper-${ZK_VERSION}.tar.gz" + - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}.tar.gz" + - tar -xvf "apache-zookeeper-${ZK_VERSION}.tar.gz" # Skip the install step. Don't `go get` dependencies. Only build with the code # in vendor/ @@ -47,7 +47,7 @@ before_script: - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" # zookeeper - - ./zookeeper-${ZK_VERSION}/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null + - ./apache-zookeeper-${ZK_VERSION}/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null script: - go build ./... From 37162615b57506d1f78b9d5cee623cd8aa6d420d Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 14:17:19 +0800 Subject: [PATCH 09/18] fix zk --- .travis.yml | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/.travis.yml b/.travis.yml index 342a9adbf..6e4cd0b18 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,8 +30,9 @@ before_install: - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin # zookeeper - - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}.tar.gz" - - tar -xvf "apache-zookeeper-${ZK_VERSION}.tar.gz" + - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz" + - tar -xvf "apache-zookeeper-${ZK_VERSION}-bin.tar.gz" + - chmod +x ./apache-zookeeper-${ZK_VERSION}-bin/bin/zkServer.sh # Skip the install step. Don't `go get` dependencies. Only build with the code # in vendor/ @@ -47,7 +48,7 @@ before_script: - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" # zookeeper - - ./apache-zookeeper-${ZK_VERSION}/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null + - ./apache-zookeeper-${ZK_VERSION}-bin/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null script: - go build ./... From 48ec8a47b6c496113789787461b10f1387c2f1e7 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 14:20:36 +0800 Subject: [PATCH 10/18] fix zk permission --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 6e4cd0b18..eef8eb253 100644 --- a/.travis.yml +++ b/.travis.yml @@ -48,7 +48,7 @@ before_script: - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" # zookeeper - - ./apache-zookeeper-${ZK_VERSION}-bin/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null + - sudo ./apache-zookeeper-${ZK_VERSION}-bin/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null script: - go build ./... From 6b412bb78a38ac0c8eb751cdd8bceb69bdaa6469 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 14:29:28 +0800 Subject: [PATCH 11/18] fix zk path --- .travis.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index eef8eb253..9e604004a 100644 --- a/.travis.yml +++ b/.travis.yml @@ -32,6 +32,7 @@ before_install: # zookeeper - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz" - tar -xvf "apache-zookeeper-${ZK_VERSION}-bin.tar.gz" + - mv apache-zookeeper-${ZK_VERSION}-bin zk - chmod +x ./apache-zookeeper-${ZK_VERSION}-bin/bin/zkServer.sh # Skip the install step. Don't `go get` dependencies. Only build with the code @@ -48,7 +49,7 @@ before_script: - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" # zookeeper - - sudo ./apache-zookeeper-${ZK_VERSION}-bin/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null + - sudo ./zk/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null script: - go build ./... From 4f915134ab6910d1abad4152472e04439f6e3cd2 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 14:32:00 +0800 Subject: [PATCH 12/18] fix zk path --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index 9e604004a..3d041a9b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -33,7 +33,7 @@ before_install: - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz" - tar -xvf "apache-zookeeper-${ZK_VERSION}-bin.tar.gz" - mv apache-zookeeper-${ZK_VERSION}-bin zk - - chmod +x ./apache-zookeeper-${ZK_VERSION}-bin/bin/zkServer.sh + - chmod +x ./zk/bin/zkServer.sh # Skip the install step. Don't `go get` dependencies. Only build with the code # in vendor/ From 264e0d6c06b369645c84a2ee8b176987fb1aad63 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 22:55:44 +0800 Subject: [PATCH 13/18] fix empty path --- pkg/naming/zookeeper/zookeeper.go | 54 +++++++++++++++++--------- pkg/naming/zookeeper/zookeeper_test.go | 2 +- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go index 92662a058..cbcffa3e0 100644 --- a/pkg/naming/zookeeper/zookeeper.go +++ b/pkg/naming/zookeeper/zookeeper.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "net/url" "path" "strings" "sync" @@ -203,14 +204,14 @@ func (z *Zookeeper) createPath(paths string) error { ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll)) if err != nil { log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err)) - return err + } else { + log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) } - log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) } return nil } -func (z *Zookeeper) registerPeerServer(name, host string, ins *naming.Instance) (err error) { +func (z *Zookeeper) registerPeerServer(nodePath string, ins *naming.Instance) (err error) { var ( str string ) @@ -219,11 +220,18 @@ func (z *Zookeeper) registerPeerServer(name, host string, ins *naming.Instance) return } log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val))) - str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + ok, _, err := z.cli.Exists(nodePath) if err != nil { - log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", name+host, err)) + return err + } + if ok { + return nil + } + str, err = z.cli.Create(nodePath, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", nodePath, err)) } else { - log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) + log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", nodePath, str)) } return } @@ -237,8 +245,12 @@ func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err err log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err)) } for _, addr := range ins.Addrs { - addr = strings.Replace(addr, "://", ":", 1) - if err = z.registerPeerServer(prefix, "/"+addr, ins); err != nil { + u, err := url.Parse(addr) + if err != nil { + continue + } + nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] + if err = z.registerPeerServer(nodePath, ins); err != nil { log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) } else { log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) @@ -251,26 +263,29 @@ func (z *Zookeeper) unregister(ins *naming.Instance) (err error) { log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) prefix := z.keyPrefix(ins.AppID) for _, addr := range ins.Addrs { - addr = strings.Replace(addr, ":", "://", 1) - strNode := prefix + "/" + addr - exists, _, err := z.cli.Exists(strNode) + u, err := url.Parse(addr) + if err != nil { + continue + } + nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] + exists, _, err := z.cli.Exists(nodePath) if err != nil { - log.Error("zk.Conn.Exists node:(%v), error:(%v)", strNode, err) + log.Error("zk.Conn.Exists node:(%v), error:(%v)", nodePath, err) continue } if exists { - _, s, err := z.cli.Get(strNode) + _, s, err := z.cli.Get(nodePath) if err != nil { - log.Error("zk.Conn.Get node:(%s), error:(%v)", strNode, err) + log.Error("zk.Conn.Get node:(%s), error:(%v)", nodePath, err) continue } - if err = z.cli.Delete(strNode, s.Version); err != nil { - log.Error("zk.Conn.Delete node:(%s), error:(%v)", strNode, err) + if err = z.cli.Delete(nodePath, s.Version); err != nil { + log.Error("zk.Conn.Delete node:(%s), error:(%v)", nodePath, err) continue } } - log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) + log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", nodePath, ins.AppID, ins.Hostname)) } return } @@ -287,8 +302,8 @@ func (z *Zookeeper) Close() error { func (a *appInfo) watch(appID string) { _ = a.fetchstore(appID) - prefix := a.zkb.keyPrefix(appID) go func() { + prefix := a.zkb.keyPrefix(appID) for { log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) @@ -297,7 +312,7 @@ func (a *appInfo) watch(appID string) { time.Sleep(time.Second) continue } - log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot)) + log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot)) for ev := range event { log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) if ev.Type == zk.EventNodeChildrenChanged { @@ -310,6 +325,7 @@ func (a *appInfo) watch(appID string) { func (a *appInfo) fetchstore(appID string) (err error) { prefix := a.zkb.keyPrefix(appID) + a.zkb.createPath(prefix) childs, _, err := a.zkb.cli.Children(prefix) if err != nil { log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) diff --git a/pkg/naming/zookeeper/zookeeper_test.go b/pkg/naming/zookeeper/zookeeper_test.go index d958527af..5b5cc0ec0 100644 --- a/pkg/naming/zookeeper/zookeeper_test.go +++ b/pkg/naming/zookeeper/zookeeper_test.go @@ -39,7 +39,7 @@ func TestZookeeper(t *testing.T) { if err != nil { t.Fatal(err) } - time.Sleep(time.Second) + time.Sleep(time.Second * 3) in, ok := res.Fetch(context.TODO()) if !ok { t.Fatal("failed to resolver fetch") From 4b11f444192a4690a2b46f4e1c5fd6b0bff16937 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 23:16:37 +0800 Subject: [PATCH 14/18] fix watch event --- pkg/naming/zookeeper/zookeeper.go | 18 +++++++++--------- pkg/naming/zookeeper/zookeeper_test.go | 10 +++------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go index cbcffa3e0..92a203ba0 100644 --- a/pkg/naming/zookeeper/zookeeper.go +++ b/pkg/naming/zookeeper/zookeeper.go @@ -249,6 +249,7 @@ func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err err if err != nil { continue } + // grpc://127.0.0.1:8000 to 127.0.0.1 nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] if err = z.registerPeerServer(nodePath, ins); err != nil { log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) @@ -267,6 +268,7 @@ func (z *Zookeeper) unregister(ins *naming.Instance) (err error) { if err != nil { continue } + // grpc://127.0.0.1:8000 to 127.0.0.1 nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] exists, _, err := z.cli.Exists(nodePath) if err != nil { @@ -310,6 +312,7 @@ func (a *appInfo) watch(appID string) { if err != nil { log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err) time.Sleep(time.Second) + _ = a.fetchstore(appID) continue } log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot)) @@ -325,27 +328,24 @@ func (a *appInfo) watch(appID string) { func (a *appInfo) fetchstore(appID string) (err error) { prefix := a.zkb.keyPrefix(appID) - a.zkb.createPath(prefix) childs, _, err := a.zkb.cli.Children(prefix) if err != nil { log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) - } else { - log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) + return } + log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) ins := &naming.InstancesInfo{ Instances: make(map[string][]*naming.Instance, 0), } - strNode := "" for _, child := range childs { - strNode = prefix + "/" + child - resp, _, err := a.zkb.cli.Get(strNode) + nodePath := prefix + "/" + child + resp, _, err := a.zkb.cli.Get(nodePath) if err != nil { - log.Error("zookeeper: fetch client.Get(%s) error:(%v)", strNode, err) + log.Error("zookeeper: fetch client.Get(%s) error:(%v)", nodePath, err) return err } in := new(naming.Instance) - err = json.Unmarshal(resp, in) - if err != nil { + if err = json.Unmarshal(resp, in); err != nil { return err } ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in) diff --git a/pkg/naming/zookeeper/zookeeper_test.go b/pkg/naming/zookeeper/zookeeper_test.go index 5b5cc0ec0..192bc1883 100644 --- a/pkg/naming/zookeeper/zookeeper_test.go +++ b/pkg/naming/zookeeper/zookeeper_test.go @@ -3,7 +3,6 @@ package zookeeper import ( "context" "testing" - "time" "github.com/bilibili/kratos/pkg/naming" ) @@ -30,16 +29,13 @@ func TestZookeeper(t *testing.T) { t.Fatal(err) } res := zk.Build(_testAppid) - go func() { - for event := range res.Watch() { - t.Log(event) - } - }() + event := res.Watch() _, err = zk.Register(context.TODO(), _testIns) if err != nil { t.Fatal(err) } - time.Sleep(time.Second * 3) + <-event + <-event in, ok := res.Fetch(context.TODO()) if !ok { t.Fatal("failed to resolver fetch") From 9155dbfe6b01480f52bf8402c8b40e0c3ba680c0 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 23:29:05 +0800 Subject: [PATCH 15/18] fix test event --- pkg/naming/zookeeper/zookeeper_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/naming/zookeeper/zookeeper_test.go b/pkg/naming/zookeeper/zookeeper_test.go index 192bc1883..0539d8f2a 100644 --- a/pkg/naming/zookeeper/zookeeper_test.go +++ b/pkg/naming/zookeeper/zookeeper_test.go @@ -28,13 +28,13 @@ func TestZookeeper(t *testing.T) { if err != nil { t.Fatal(err) } - res := zk.Build(_testAppid) - event := res.Watch() _, err = zk.Register(context.TODO(), _testIns) if err != nil { t.Fatal(err) } - <-event + // fetch&watch + res := zk.Build(_testAppid) + event := res.Watch() <-event in, ok := res.Fetch(context.TODO()) if !ok { From 0f7fb0e66dcacc8053268864b2cd1369954e3116 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 23:39:47 +0800 Subject: [PATCH 16/18] fix test bind --- .../resolver/direct/test/direct_test.go | 6 +++--- pkg/net/rpc/warden/server_test.go | 19 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/pkg/net/rpc/warden/resolver/direct/test/direct_test.go b/pkg/net/rpc/warden/resolver/direct/test/direct_test.go index b898fd643..cc3b65112 100644 --- a/pkg/net/rpc/warden/resolver/direct/test/direct_test.go +++ b/pkg/net/rpc/warden/resolver/direct/test/direct_test.go @@ -42,8 +42,8 @@ func createServer(name, listen string) *warden.Server { func TestMain(m *testing.M) { resolver.Register(direct.New()) ctx := context.TODO() - s1 := createServer("server1", "127.0.0.1:18081") - s2 := createServer("server2", "127.0.0.1:18082") + s1 := createServer("server1", "127.0.0.1:18001") + s2 := createServer("server2", "127.0.0.1:18002") defer s1.Shutdown(ctx) defer s2.Shutdown(ctx) os.Exit(m.Run()) @@ -68,7 +68,7 @@ func createTestClient(t *testing.T, connStr string) pb.GreeterClient { } func TestDirect(t *testing.T) { - cli := createTestClient(t, "direct://default/127.0.0.1:18083,127.0.0.1:18082") + cli := createTestClient(t, "direct://default/127.0.0.1:18003,127.0.0.1:18002") count := 0 for i := 0; i < 10; i++ { if resp, err := cli.SayHello(context.TODO(), &pb.HelloRequest{Age: 1, Name: "hello"}); err != nil { diff --git a/pkg/net/rpc/warden/server_test.go b/pkg/net/rpc/warden/server_test.go index 5d0ea2afa..18c508685 100644 --- a/pkg/net/rpc/warden/server_test.go +++ b/pkg/net/rpc/warden/server_test.go @@ -30,6 +30,8 @@ import ( const ( _separator = "\001" + + _testAddr = "127.0.0.1:9090" ) var ( @@ -156,7 +158,7 @@ func (s *helloServer) StreamHello(ss pb.Greeter_StreamHelloServer) error { func runServer(t *testing.T, interceptors ...grpc.UnaryServerInterceptor) func() { return func() { - server = NewServer(&ServerConfig{Addr: "127.0.0.1:8080", Timeout: xtime.Duration(time.Second)}) + server = NewServer(&ServerConfig{Addr: _testAddr, Timeout: xtime.Duration(time.Second)}) pb.RegisterGreeterServer(server.Server(), &helloServer{t}) server.Use( func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { @@ -180,7 +182,7 @@ func runServer(t *testing.T, interceptors ...grpc.UnaryServerInterceptor) func() func runClient(ctx context.Context, cc *ClientConfig, t *testing.T, name string, age int32, interceptors ...grpc.UnaryClientInterceptor) (resp *pb.HelloReply, err error) { client := NewClient(cc) client.Use(interceptors...) - conn, err := client.Dial(context.Background(), "127.0.0.1:8080") + conn, err := client.Dial(context.Background(), _testAddr) if err != nil { panic(fmt.Errorf("did not connect: %v,req: %v %v", err, name, age)) } @@ -226,7 +228,7 @@ func testTimeoutOpt(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel() client := NewClient(&clientConfig) - conn, err := client.Dial(ctx, "127.0.0.1:8080") + conn, err := client.Dial(ctx, _testAddr) if err != nil { t.Fatalf("did not connect: %v", err) } @@ -284,7 +286,7 @@ func testAllErrorCase(t *testing.T) { func testBreaker(t *testing.T) { client := NewClient(&clientConfig) - conn, err := client.Dial(context.Background(), "127.0.0.1:8080") + conn, err := client.Dial(context.Background(), _testAddr) if err != nil { t.Fatalf("did not connect: %v", err) } @@ -378,7 +380,7 @@ func testClientRecovery(t *testing.T) { panic("client recovery test") }) - conn, err := client.Dial(ctx, "127.0.0.1:8080") + conn, err := client.Dial(ctx, _testAddr) if err != nil { t.Fatalf("did not connect: %v", err) } @@ -403,7 +405,7 @@ func testServerRecovery(t *testing.T) { ctx := context.Background() client := NewClient(&clientConfig) - conn, err := client.Dial(ctx, "127.0.0.1:8080") + conn, err := client.Dial(ctx, _testAddr) if err != nil { t.Fatalf("did not connect: %v", err) } @@ -491,7 +493,7 @@ func testTrace(t *testing.T, port int, isStream bool) { } func BenchmarkServer(b *testing.B) { - server := NewServer(&ServerConfig{Addr: "127.0.0.1:8080", Timeout: xtime.Duration(time.Second)}) + server := NewServer(&ServerConfig{Addr: _testAddr, Timeout: xtime.Duration(time.Second)}) go func() { pb.RegisterGreeterServer(server.Server(), &helloServer{}) if _, err := server.Start(); err != nil { @@ -503,7 +505,7 @@ func BenchmarkServer(b *testing.B) { server.Server().Stop() }() client := NewClient(&clientConfig) - conn, err := client.Dial(context.Background(), "127.0.0.1:8080") + conn, err := client.Dial(context.Background(), _testAddr) if err != nil { conn.Close() b.Fatalf("did not connect: %v", err) @@ -602,4 +604,3 @@ func TestStartWithAddr(t *testing.T) { assert.Nil(t, err) } } - From 0cad7e3d5a33470180d50000945c5b75ec8578e2 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 23:45:50 +0800 Subject: [PATCH 17/18] add path comment --- pkg/naming/zookeeper/zookeeper.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go index 92a203ba0..b6221d052 100644 --- a/pkg/naming/zookeeper/zookeeper.go +++ b/pkg/naming/zookeeper/zookeeper.go @@ -61,6 +61,7 @@ type Resolve struct { } // Zookeeper is a zookeeper client Builder. +// path: /{root}/{appid}/{ip} -> json(instance) type Zookeeper struct { c *Config cli *zk.Conn From aae2db977bdc94dbfadbb670d3e3f27ba5f52515 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 23:54:00 +0800 Subject: [PATCH 18/18] fix test addr --- pkg/conf/paladin/apollo/apollo_test.go | 2 +- pkg/conf/paladin/apollo/internal/mockserver/mockserver.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/conf/paladin/apollo/apollo_test.go b/pkg/conf/paladin/apollo/apollo_test.go index cef79cb10..bb97f5d22 100644 --- a/pkg/conf/paladin/apollo/apollo_test.go +++ b/pkg/conf/paladin/apollo/apollo_test.go @@ -43,7 +43,7 @@ func TestApollo(t *testing.T) { os.Setenv("APOLLO_APP_ID", "SampleApp") os.Setenv("APOLLO_CLUSTER", "default") os.Setenv("APOLLO_CACHE_DIR", "/tmp") - os.Setenv("APOLLO_META_ADDR", "localhost:8080") + os.Setenv("APOLLO_META_ADDR", "localhost:8010") os.Setenv("APOLLO_NAMESPACES", fmt.Sprintf("%s,%s", testAppYAML, testClientJSON)) mockserver.Set(testAppYAML, "content", testAppYAMLContent1) mockserver.Set(testClientJSON, "content", testClientJSONContent) diff --git a/pkg/conf/paladin/apollo/internal/mockserver/mockserver.go b/pkg/conf/paladin/apollo/internal/mockserver/mockserver.go index 5ed0bc2ba..cca5b61ab 100644 --- a/pkg/conf/paladin/apollo/internal/mockserver/mockserver.go +++ b/pkg/conf/paladin/apollo/internal/mockserver/mockserver.go @@ -137,7 +137,7 @@ func initServer() { mux.Handle("/notifications/", http.HandlerFunc(server.NotificationHandler)) mux.Handle("/configs/", http.HandlerFunc(server.ConfigHandler)) server.server.Handler = mux - server.server.Addr = ":8080" + server.server.Addr = ":8010" } // Close mock server