diff --git a/.travis.yml b/.travis.yml index b9bfc6413..fb8840993 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,10 +23,13 @@ env: - DOCKER_COMPOSE_VERSION=1.24.1 before_install: + # docker-compose - sudo rm /usr/local/bin/docker-compose - curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin + # zookeeper + - "sudo apt-get install -y zookeeper-server" # Skip the install step. Don't `go get` dependencies. Only build with the code # in vendor/ @@ -37,9 +40,13 @@ install: true # Make sure golangci-lint is vendored. before_script: - curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $GOPATH/bin + # discovery - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/install.sh | sh -s -- -b $GOPATH/bin - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" + # zookeeper + - "sudo service zookeeper-server init" + - "sudo service zookeeper-server start" script: - go build ./... diff --git a/go.mod b/go.mod index 6efa655a2..8a81255ea 100644 --- a/go.mod +++ b/go.mod @@ -17,7 +17,7 @@ require ( github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect github.com/go-sql-driver/mysql v1.4.1 - github.com/go-zookeeper/zk v1.0.1 // indirect + github.com/go-zookeeper/zk v1.0.1 github.com/gogo/protobuf v1.3.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/mock v1.3.1 // indirect diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go index 9e50f130f..92662a058 100644 --- a/pkg/naming/zookeeper/zookeeper.go +++ b/pkg/naming/zookeeper/zookeeper.go @@ -5,25 +5,30 @@ import ( "encoding/json" "errors" "fmt" + "path" + "strings" "sync" "sync/atomic" "time" "github.com/bilibili/kratos/pkg/log" "github.com/bilibili/kratos/pkg/naming" + xtime "github.com/bilibili/kratos/pkg/time" "github.com/go-zookeeper/zk" ) +// Config is zookeeper config. type Config struct { - // Endpoints is a list of URLs. - Endpoints []string `json:"endpoints"` + Root string `json:"root"` + Endpoints []string `json:"endpoints"` + Timeout xtime.Duration `json:"timeout"` } var ( _once sync.Once _builder naming.Builder - //ErrDuplication is a register duplication err + // ErrDuplication is a register duplication err ErrDuplication = errors.New("zookeeper: instance duplicate registration") ) @@ -40,22 +45,10 @@ func Build(c *Config, id string) naming.Resolver { return Builder(c).Build(id) } -// ZookeeperBuilder is a zookeeper client Builder -type ZookeeperBuilder struct { - cli *zk.Conn - connEvent <-chan zk.Event - ctx context.Context - cancelFunc context.CancelFunc - - mutex sync.RWMutex - apps map[string]*appInfo - registry map[string]struct{} -} - type appInfo struct { resolver map[*Resolve]struct{} ins atomic.Value - zkb *ZookeeperBuilder + zkb *Zookeeper once sync.Once } @@ -63,28 +56,43 @@ type appInfo struct { type Resolve struct { id string event chan struct{} - zkb *ZookeeperBuilder + zkb *Zookeeper +} + +// Zookeeper is a zookeeper client Builder. +type Zookeeper struct { + c *Config + cli *zk.Conn + connEvent <-chan zk.Event + ctx context.Context + cancelFunc context.CancelFunc + + mutex sync.RWMutex + apps map[string]*appInfo + registry map[string]struct{} } -// New is new a zookeeper builder -func New(c *Config) (zkb *ZookeeperBuilder, err error) { - //example: endpointSli = []string{"192.168.1.78:2181", "192.168.1.79:2181", "192.168.1.80:2181"} +// New is new a zookeeper builder. +func New(c *Config) (zkb *Zookeeper, err error) { + if c.Timeout == 0 { + c.Timeout = xtime.Duration(time.Second) + } if len(c.Endpoints) == 0 { errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null") log.Error(errInfo) return nil, errors.New(errInfo) } - zkConn, connEvent, err := zk.Connect(c.Endpoints, 5*time.Second) + zkConn, connEvent, err := zk.Connect(c.Endpoints, time.Duration(c.Timeout)) if err != nil { log.Error(fmt.Sprintf("zk Connect err:(%v)", err)) return - } else { - log.Info(fmt.Sprintf("zk Connect ok!")) } + log.Info(fmt.Sprintf("zk Connect ok!")) ctx, cancel := context.WithCancel(context.Background()) - zkb = &ZookeeperBuilder{ + zkb = &Zookeeper{ + c: c, cli: zkConn, connEvent: connEvent, ctx: ctx, @@ -96,7 +104,7 @@ func New(c *Config) (zkb *ZookeeperBuilder, err error) { } // Build zookeeper resovler builder. -func (z *ZookeeperBuilder) Build(appid string) naming.Resolver { +func (z *Zookeeper) Build(appid string) naming.Resolver { r := &Resolve{ id: appid, zkb: z, @@ -119,21 +127,19 @@ func (z *ZookeeperBuilder) Build(appid string) naming.Resolver { default: } } - app.once.Do(func() { go app.watch(appid) - log.Info("zookeeper: AddWatch(%s) already watch(%v)", appid, ok) }) return r } -// Scheme return zookeeper's scheme -func (z *ZookeeperBuilder) Scheme() string { +// Scheme return zookeeper's scheme. +func (z *Zookeeper) Scheme() string { return "zookeeper" } -// Register is register instance -func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { +// Register is register instance. +func (z *Zookeeper) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { z.mutex.Lock() if _, ok := z.registry[ins.AppID]; ok { err = ErrDuplication @@ -157,16 +163,13 @@ func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) ( cancel() <-ch }) - go func() { for { select { case connEvent := <-z.connEvent: - log.Warn("watch zkClient state, connEvent:(%v)", connEvent) + log.Info("watch zkClient state, connEvent:(%+v)", connEvent) if connEvent.State == zk.StateHasSession { - log.Warn("watch zkClient state, state is StateHasSession...") - err = z.register(ctx, ins) - if err != nil { + if err = z.register(ctx, ins); err != nil { log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err)) continue } @@ -180,110 +183,120 @@ func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) ( return } -func (z *ZookeeperBuilder) registerPerServer(name string) (err error) { +func (z *Zookeeper) createPath(paths string) error { var ( - str string + lastPath = "/" + seps = strings.Split(paths, "/") ) - - str, err = z.cli.Create(name, nil, 0, zk.WorldACL(zk.PermAll)) - if err != nil { - log.Warn(fmt.Sprintf("registerPerServer, fail to Create node:(%s). err:(%v)", name, err)) - } else { - log.Info(fmt.Sprintf("registerPerServer, succeed to Create node:(%s). retStr:(%s)", name, str)) + for _, part := range seps { + if part == "" { + continue + } + lastPath = path.Join(lastPath, part) + ok, _, err := z.cli.Exists(lastPath) + if err != nil { + return err + } + if ok { + continue + } + ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err)) + return err + } + log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) } - - return + return nil } -func (z *ZookeeperBuilder) registerEphServer(name, host string, ins *naming.Instance) (err error) { +func (z *Zookeeper) registerPeerServer(name, host string, ins *naming.Instance) (err error) { var ( str string ) - - val, _ := json.Marshal(ins) - log.Info(fmt.Sprintf("registerEphServer, ins after json.Marshal:(%v)", string(val))) - + val, err := json.Marshal(ins) + if err != nil { + return + } + log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val))) str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) if err != nil { - log.Warn(fmt.Sprintf("registerEphServer, fail to Create node:%s. err:(%v)", name+host, err)) + log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", name+host, err)) } else { - log.Info(fmt.Sprintf("registerEphServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) + log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) } - return } -// register 注册zookeeper节点 -func (z *ZookeeperBuilder) register(ctx context.Context, ins *naming.Instance) (err error) { +// register is register instance to zookeeper. +func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err error) { log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs) - prefix := z.keyPrefix(ins) - err = z.registerPerServer(prefix) - if err != nil { - log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err)) + prefix := z.keyPrefix(ins.AppID) + if err = z.createPath(prefix); err != nil { + log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err)) } - - for _, val := range ins.Addrs { - err = z.registerEphServer(prefix, "/"+val, ins) - if err != nil { - log.Warn(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) + for _, addr := range ins.Addrs { + addr = strings.Replace(addr, "://", ":", 1) + if err = z.registerPeerServer(prefix, "/"+addr, ins); err != nil { + log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) } else { log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) } } - return nil } -// unregister 删除zookeeper中节点信息 -func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) { +func (z *Zookeeper) unregister(ins *naming.Instance) (err error) { log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) - prefix := z.keyPrefix(ins) - - for _, val := range ins.Addrs { - strNode := prefix + "/" + val + prefix := z.keyPrefix(ins.AppID) + for _, addr := range ins.Addrs { + addr = strings.Replace(addr, ":", "://", 1) + strNode := prefix + "/" + addr exists, _, err := z.cli.Exists(strNode) if err != nil { - log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) - return err + log.Error("zk.Conn.Exists node:(%v), error:(%v)", strNode, err) + continue } if exists { _, s, err := z.cli.Get(strNode) if err != nil { - log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) - return err + log.Error("zk.Conn.Get node:(%s), error:(%v)", strNode, err) + continue + } + if err = z.cli.Delete(strNode, s.Version); err != nil { + log.Error("zk.Conn.Delete node:(%s), error:(%v)", strNode, err) + continue } - return z.cli.Delete(strNode, s.Version) } log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) } - return } -func (z *ZookeeperBuilder) keyPrefix(ins *naming.Instance) string { - return fmt.Sprintf("/%s", ins.AppID) +func (z *Zookeeper) keyPrefix(appID string) string { + return path.Join(z.c.Root, appID) } -// Close stop all running process including zk fetch and register -func (z *ZookeeperBuilder) Close() error { +// Close stop all running process including zk fetch and register. +func (z *Zookeeper) Close() error { z.cancelFunc() return nil } func (a *appInfo) watch(appID string) { _ = a.fetchstore(appID) - prefix := fmt.Sprintf("/%s", appID) - + prefix := a.zkb.keyPrefix(appID) go func() { for { log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) if err != nil { + log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err) + time.Sleep(time.Second) continue } - log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot)) for ev := range event { log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) @@ -296,28 +309,24 @@ func (a *appInfo) watch(appID string) { } func (a *appInfo) fetchstore(appID string) (err error) { - prefix := fmt.Sprintf("/%s", appID) - strNode := "" + prefix := a.zkb.keyPrefix(appID) childs, _, err := a.zkb.cli.Children(prefix) if err != nil { - log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), err:(%v)", prefix, err)) + log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) } else { log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) } - ins := &naming.InstancesInfo{ Instances: make(map[string][]*naming.Instance, 0), } - - //for range childs + strNode := "" for _, child := range childs { strNode = prefix + "/" + child resp, _, err := a.zkb.cli.Get(strNode) if err != nil { - log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err) + log.Error("zookeeper: fetch client.Get(%s) error:(%v)", strNode, err) return err } - in := new(naming.Instance) err = json.Unmarshal(resp, in) if err != nil { @@ -327,12 +336,10 @@ func (a *appInfo) fetchstore(appID string) (err error) { } a.store(ins) - return nil } func (a *appInfo) store(ins *naming.InstancesInfo) { - a.ins.Store(ins) a.zkb.mutex.RLock() for rs := range a.resolver { diff --git a/pkg/naming/zookeeper/zookeeper_test.go b/pkg/naming/zookeeper/zookeeper_test.go new file mode 100644 index 000000000..d958527af --- /dev/null +++ b/pkg/naming/zookeeper/zookeeper_test.go @@ -0,0 +1,50 @@ +package zookeeper + +import ( + "context" + "testing" + "time" + + "github.com/bilibili/kratos/pkg/naming" +) + +var ( + _testAppid = "test_appid" + + _testConf = &Config{ + Root: "/test", + Endpoints: []string{"127.0.0.1:2181"}, + } + _testIns = &naming.Instance{ + AppID: _testAppid, + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{ + "test_key": "test_value", + }, + } +) + +func TestZookeeper(t *testing.T) { + zk, err := New(_testConf) + if err != nil { + t.Fatal(err) + } + res := zk.Build(_testAppid) + go func() { + for event := range res.Watch() { + t.Log(event) + } + }() + _, err = zk.Register(context.TODO(), _testIns) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second) + in, ok := res.Fetch(context.TODO()) + if !ok { + t.Fatal("failed to resolver fetch") + } + if len(in.Instances) != 1 { + t.Fatalf("Instances not match, got:%d want:1", len(in.Instances)) + } +}