From fa750fdff759af04d90ab5864efbd33764270985 Mon Sep 17 00:00:00 2001 From: guobin <596194783@qq.com> Date: Mon, 21 Oct 2019 11:08:25 +0800 Subject: [PATCH 1/2] =?UTF-8?q?zookeeper=E6=8E=A5=E5=85=A5kratos=20=20?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E6=B3=A8=E5=86=8C=E5=8F=91=E7=8E=B0=E4=B8=8E?= =?UTF-8?q?=E8=B4=9F=E8=BD=BD=E5=9D=87=E8=A1=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- go.mod | 1 + go.sum | 2 + pkg/naming/zookeeper/zookeeper.go | 397 ++++++++++++++++++++++++++++++ 3 files changed, 400 insertions(+) create mode 100644 pkg/naming/zookeeper/zookeeper.go diff --git a/go.mod b/go.mod index ce078a8e7..67d49efd3 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ require ( github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect github.com/go-sql-driver/mysql v1.4.1 + github.com/go-zookeeper/zk v1.0.1 // indirect github.com/gogo/protobuf v1.3.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 // indirect diff --git a/go.sum b/go.sum index 1f489a404..65b167ca5 100644 --- a/go.sum +++ b/go.sum @@ -71,6 +71,8 @@ github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEK github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-zookeeper/zk v1.0.1 h1:LmXNmSnkNsNKai+aDu6sHRr8ZJzIrHJo8z8Z4sm8cT8= +github.com/go-zookeeper/zk v1.0.1/go.mod h1:gpJdHazfkmlg4V0rt0vYeHYJHSL8hHFwV0qOd+HRTJE= github.com/gogo/googleapis v1.1.0/go.mod h1:gf4bu3Q80BeJ6H1S1vYPm8/ELATdvryBaNFGgqEef3s= github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= github.com/gogo/protobuf v1.2.0/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go new file mode 100644 index 000000000..778e8a5a3 --- /dev/null +++ b/pkg/naming/zookeeper/zookeeper.go @@ -0,0 +1,397 @@ +package zookeeper + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/bilibili/kratos/pkg/log" + "github.com/bilibili/kratos/pkg/naming" + "github.com/go-zookeeper/zk" +) + +const ( + RootPath = "/" +) + +type Config struct { + // Endpoints is a list of URLs. + Endpoints []string `json:"endpoints"` +} + +var ( + _once sync.Once + _builder naming.Builder + + //ErrDuplication is a register duplication err + ErrDuplication = errors.New("zookeeper: instance duplicate registration") +) + +// Builder return default zookeeper resolver builder. +func Builder(c *Config) naming.Builder { + _once.Do(func() { + _builder, _ = New(c) + }) + return _builder +} + +// Build register resolver into default zookeeper. +func Build(c *Config, id string) naming.Resolver { + return Builder(c).Build(id) +} + +// ZookeeperBuilder is a zookeeper client Builder +type ZookeeperBuilder struct { + cli *zk.Conn + connEvent <-chan zk.Event + ctx context.Context + cancelFunc context.CancelFunc + + mutex sync.RWMutex + apps map[string]*appInfo + registry map[string]struct{} +} + +type appInfo struct { + resolver map[*Resolve]struct{} + ins atomic.Value + zkb *ZookeeperBuilder + once sync.Once +} + +// Resolve zookeeper resolver. +type Resolve struct { + id string + event chan struct{} + zkb *ZookeeperBuilder +} + +// New is new a zookeeper builder +func New(c *Config) (zkb *ZookeeperBuilder, err error) { + //example: endpointSli = []string{"192.168.1.78:2181", "192.168.1.79:2181", "192.168.1.80:2181"} + if len(c.Endpoints) == 0 { + errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null") + log.Error(errInfo) + return nil, errors.New(errInfo) + } + + zkConn, connEvent, err := zk.Connect(c.Endpoints, 5*time.Second) + if err != nil { + log.Error(fmt.Sprintf("zk Connect err:(%v)", err)) + return + } else { + log.Info(fmt.Sprintf("zk Connect ok!")) + } + + ctx, cancel := context.WithCancel(context.Background()) + zkb = &ZookeeperBuilder{ + cli: zkConn, + connEvent: connEvent, + ctx: ctx, + cancelFunc: cancel, + apps: map[string]*appInfo{}, + registry: map[string]struct{}{}, + } + return +} + +// Build zookeeper resovler builder. +func (z *ZookeeperBuilder) Build(appid string) naming.Resolver { + r := &Resolve{ + id: appid, + zkb: z, + event: make(chan struct{}, 1), + } + z.mutex.Lock() + app, ok := z.apps[appid] + if !ok { + app = &appInfo{ + resolver: make(map[*Resolve]struct{}), + zkb: z, + } + z.apps[appid] = app + } + app.resolver[r] = struct{}{} + z.mutex.Unlock() + if ok { + select { + case r.event <- struct{}{}: + default: + } + } + + app.once.Do(func() { + go app.watch(appid) + log.Info("zookeeper: AddWatch(%s) already watch(%v)", appid, ok) + }) + return r +} + +// Scheme return zookeeper's scheme +func (z *ZookeeperBuilder) Scheme() string { + return "zookeeper" +} + +// Register is register instance +func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { + z.mutex.Lock() + if _, ok := z.registry[ins.AppID]; ok { + err = ErrDuplication + } else { + z.registry[ins.AppID] = struct{}{} + } + z.mutex.Unlock() + if err != nil { + return + } + ctx, cancel := context.WithCancel(z.ctx) + if err = z.register(ctx, ins); err != nil { + z.mutex.Lock() + delete(z.registry, ins.AppID) + z.mutex.Unlock() + cancel() + return + } + ch := make(chan struct{}, 1) + cancelFunc = context.CancelFunc(func() { + cancel() + <-ch + }) + + go func() { + for { + select { + case connEvent := <-z.connEvent: + log.Warn("watch zkClient state, connEvent:(%v)", connEvent) + if connEvent.State == zk.StateHasSession { + log.Warn("watch zkClient state, state is StateHasSession...") + err = z.register(ctx, ins) + if err != nil { + log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err)) + continue + } + } + case <-time.After(time.Second): + } + } + }() + return +} + +func (z *ZookeeperBuilder) registerPerServer(name string) (err error) { + var ( + str string + ) + + str, err = z.cli.Create(name, nil, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("registerPerServer, fail to Create node:(%s). err:(%v)", name, err)) + } else { + log.Info(fmt.Sprintf("registerPerServer, succeed to Create node:(%s). retStr:(%s)", name, str)) + } + + return +} + +func (z *ZookeeperBuilder) registerEphServer(name, host string, ins *naming.Instance) (err error) { + var ( + str string + ) + + val, _ := json.Marshal(ins) + log.Info(fmt.Sprintf("registerEphServer, ins after json.Marshal:(%v)", string(val))) + + str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("registerEphServer, fail to Create node:%s. err:(%v)", name+host, err)) + } else { + log.Info(fmt.Sprintf("registerEphServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) + } + + return +} + +// register 注册zookeeper节点 +func (z *ZookeeperBuilder) register(ctx context.Context, ins *naming.Instance) (err error) { + log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs) + prefix := z.keyPrefix(ins) + + err = z.registerPerServer(prefix) + if err != nil { + log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err)) + } + + var svrAddr string + for _, val := range ins.Addrs { + if strings.HasPrefix(val, "grpc://") { + svrAddr = strings.TrimPrefix(val, "grpc://") + break + } + } + if svrAddr == "" { + errInfo := fmt.Sprintf("register, error occur, grpc svrAddr is null") + log.Error(errInfo) + return errors.New(errInfo) + } + + err = z.registerEphServer(prefix, RootPath+svrAddr, ins) + if err != nil { + log.Error(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) + //return + } else { + log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) + } + + return nil +} + +// unregister 删除zookeeper中节点信息 +func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) { + log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) + prefix := z.keyPrefix(ins) + + var svrAddr string + for _, val := range ins.Addrs { + if strings.HasPrefix(val, "grpc://") { + svrAddr = strings.TrimPrefix(val, "grpc://") + break + } + } + if svrAddr == "" { + errInfo := fmt.Sprintf("unregister, error occur, grpc svrAddr is null") + log.Error(errInfo) + return errors.New(errInfo) + } + + strNode := prefix + RootPath + svrAddr + exists, _, err := z.cli.Exists(strNode) + if err != nil { + log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) + return err + } + if exists { + _, s, err := z.cli.Get(strNode) + if err != nil { + log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) + return err + } + return z.cli.Delete(strNode, s.Version) + } + + log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) + return +} + +func (z *ZookeeperBuilder) keyPrefix(ins *naming.Instance) string { + return fmt.Sprintf("/%s", ins.AppID) +} + +// Close stop all running process including zk fetch and register +func (z *ZookeeperBuilder) Close() error { + z.cancelFunc() + return nil +} + +func (a *appInfo) watch(appID string) { + _ = a.fetchstore(appID) + prefix := fmt.Sprintf("/%s", appID) + + go func() { + for { + log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) + snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) + if err != nil { + continue + } + + log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot)) + for ev := range event { + log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) + if ev.Type == zk.EventNodeChildrenChanged { + _ = a.fetchstore(appID) + } + } + } + }() +} + +func (a *appInfo) fetchstore(appID string) (err error) { + prefix := fmt.Sprintf("/%s", appID) + strNode := "" + childs, _, err := a.zkb.cli.Children(prefix) + if err != nil { + log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), err:(%v)", prefix, err)) + } else { + log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) + } + + ins := &naming.InstancesInfo{ + Instances: make(map[string][]*naming.Instance, 0), + } + + //for range childs + for _, child := range childs { + strNode = prefix + RootPath + child + resp, _, err := a.zkb.cli.Get(strNode) + if err != nil { + log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err) + return err + } + + in := new(naming.Instance) + err = json.Unmarshal(resp, in) + if err != nil { + return err + } + ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in) + + } + a.store(ins) + + return nil +} + +func (a *appInfo) store(ins *naming.InstancesInfo) { + + a.ins.Store(ins) + a.zkb.mutex.RLock() + for rs := range a.resolver { + select { + case rs.event <- struct{}{}: + default: + } + } + a.zkb.mutex.RUnlock() +} + +// Watch watch instance. +func (r *Resolve) Watch() <-chan struct{} { + return r.event +} + +// Fetch fetch resolver instance. +func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) { + r.zkb.mutex.RLock() + app, ok := r.zkb.apps[r.id] + r.zkb.mutex.RUnlock() + if ok { + ins, ok = app.ins.Load().(*naming.InstancesInfo) + return + } + return +} + +// Close close resolver. +func (r *Resolve) Close() error { + r.zkb.mutex.Lock() + if app, ok := r.zkb.apps[r.id]; ok && len(app.resolver) != 0 { + delete(app.resolver, r) + } + r.zkb.mutex.Unlock() + return nil +} From d9f57d5647f7f463317329ee9bc8e547a313fe48 Mon Sep 17 00:00:00 2001 From: guobin <596194783@qq.com> Date: Wed, 23 Oct 2019 15:50:06 +0800 Subject: [PATCH 2/2] =?UTF-8?q?zookeeper=E6=8E=A5=E5=85=A5kratos=20=20?= =?UTF-8?q?=E6=9C=8D=E5=8A=A1=E6=B3=A8=E5=86=8C=E5=8F=91=E7=8E=B0=E4=B8=8E?= =?UTF-8?q?=E8=B4=9F=E8=BD=BD=E5=9D=87=E8=A1=A1=201.=20=E5=8A=A0=E5=85=A5c?= =?UTF-8?q?tx.Done=E7=9A=84=E6=83=85=E5=86=B5=E9=98=B2=E6=AD=A2groutine=20?= =?UTF-8?q?=E6=B3=84=E9=9C=B2=202.=20=E6=B3=A8=E5=86=8C=E7=9A=84=E5=8D=8F?= =?UTF-8?q?=E8=AE=AE=EF=BC=8C=E5=85=81=E8=AE=B8=E6=B3=A8=E5=86=8C=E5=A4=9A?= =?UTF-8?q?=E7=A7=8D=E5=8D=8F=E8=AE=AE=EF=BC=8C=E5=85=B7=E4=BD=93=E9=80=89?= =?UTF-8?q?=E6=8B=A9=E5=93=AA=E7=A7=8D=E5=8D=8F=E8=AE=AE=E6=9C=89client?= =?UTF-8?q?=E7=AB=AF=E9=80=89=E6=8B=A9=203.=20=E5=8E=BB=E6=8E=89RootPath?= =?UTF-8?q?=E5=B8=B8=E9=87=8F=EF=BC=8C=E9=81=BF=E5=85=8D=E6=AD=A7=E4=B9=89?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pkg/naming/zookeeper/zookeeper.go | 69 ++++++++++--------------------- 1 file changed, 22 insertions(+), 47 deletions(-) diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go index 778e8a5a3..9e50f130f 100644 --- a/pkg/naming/zookeeper/zookeeper.go +++ b/pkg/naming/zookeeper/zookeeper.go @@ -5,7 +5,6 @@ import ( "encoding/json" "errors" "fmt" - "strings" "sync" "sync/atomic" "time" @@ -15,10 +14,6 @@ import ( "github.com/go-zookeeper/zk" ) -const ( - RootPath = "/" -) - type Config struct { // Endpoints is a list of URLs. Endpoints []string `json:"endpoints"` @@ -176,7 +171,9 @@ func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) ( continue } } - case <-time.After(time.Second): + case <-ctx.Done(): + ch <- struct{}{} + return } } }() @@ -226,26 +223,14 @@ func (z *ZookeeperBuilder) register(ctx context.Context, ins *naming.Instance) ( log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err)) } - var svrAddr string for _, val := range ins.Addrs { - if strings.HasPrefix(val, "grpc://") { - svrAddr = strings.TrimPrefix(val, "grpc://") - break + err = z.registerEphServer(prefix, "/"+val, ins) + if err != nil { + log.Warn(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) + } else { + log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) } } - if svrAddr == "" { - errInfo := fmt.Sprintf("register, error occur, grpc svrAddr is null") - log.Error(errInfo) - return errors.New(errInfo) - } - - err = z.registerEphServer(prefix, RootPath+svrAddr, ins) - if err != nil { - log.Error(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) - //return - } else { - log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) - } return nil } @@ -255,35 +240,25 @@ func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) { log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) prefix := z.keyPrefix(ins) - var svrAddr string for _, val := range ins.Addrs { - if strings.HasPrefix(val, "grpc://") { - svrAddr = strings.TrimPrefix(val, "grpc://") - break - } - } - if svrAddr == "" { - errInfo := fmt.Sprintf("unregister, error occur, grpc svrAddr is null") - log.Error(errInfo) - return errors.New(errInfo) - } - - strNode := prefix + RootPath + svrAddr - exists, _, err := z.cli.Exists(strNode) - if err != nil { - log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) - return err - } - if exists { - _, s, err := z.cli.Get(strNode) + strNode := prefix + "/" + val + exists, _, err := z.cli.Exists(strNode) if err != nil { - log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) + log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) return err } - return z.cli.Delete(strNode, s.Version) + if exists { + _, s, err := z.cli.Get(strNode) + if err != nil { + log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) + return err + } + return z.cli.Delete(strNode, s.Version) + } + + log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) } - log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) return } @@ -336,7 +311,7 @@ func (a *appInfo) fetchstore(appID string) (err error) { //for range childs for _, child := range childs { - strNode = prefix + RootPath + child + strNode = prefix + "/" + child resp, _, err := a.zkb.cli.Get(strNode) if err != nil { log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err)