add zk test

pull/412/head
Tony 5 years ago
parent 3a16ed706b
commit 56e06c61a4
  1. 7
      .travis.yml
  2. 2
      go.mod
  3. 191
      pkg/naming/zookeeper/zookeeper.go
  4. 50
      pkg/naming/zookeeper/zookeeper_test.go

@ -23,10 +23,13 @@ env:
- DOCKER_COMPOSE_VERSION=1.24.1 - DOCKER_COMPOSE_VERSION=1.24.1
before_install: before_install:
# docker-compose
- sudo rm /usr/local/bin/docker-compose - sudo rm /usr/local/bin/docker-compose
- curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose - curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose
- chmod +x docker-compose - chmod +x docker-compose
- sudo mv docker-compose /usr/local/bin - sudo mv docker-compose /usr/local/bin
# zookeeper
- "sudo apt-get install -y zookeeper-server"
# Skip the install step. Don't `go get` dependencies. Only build with the code # Skip the install step. Don't `go get` dependencies. Only build with the code
# in vendor/ # in vendor/
@ -37,9 +40,13 @@ install: true
# Make sure golangci-lint is vendored. # Make sure golangci-lint is vendored.
before_script: before_script:
- curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $GOPATH/bin - curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $GOPATH/bin
# discovery
- curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/install.sh | sh -s -- -b $GOPATH/bin - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/install.sh | sh -s -- -b $GOPATH/bin
- curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml
- nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &"
# zookeeper
- "sudo service zookeeper-server init"
- "sudo service zookeeper-server start"
script: script:
- go build ./... - go build ./...

@ -17,7 +17,7 @@ require (
github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/locales v0.12.1 // indirect
github.com/go-playground/universal-translator v0.16.0 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect
github.com/go-sql-driver/mysql v1.4.1 github.com/go-sql-driver/mysql v1.4.1
github.com/go-zookeeper/zk v1.0.1 // indirect github.com/go-zookeeper/zk v1.0.1
github.com/gogo/protobuf v1.3.0 github.com/gogo/protobuf v1.3.0
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/mock v1.3.1 // indirect github.com/golang/mock v1.3.1 // indirect

@ -5,18 +5,23 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"path"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
"github.com/bilibili/kratos/pkg/log" "github.com/bilibili/kratos/pkg/log"
"github.com/bilibili/kratos/pkg/naming" "github.com/bilibili/kratos/pkg/naming"
xtime "github.com/bilibili/kratos/pkg/time"
"github.com/go-zookeeper/zk" "github.com/go-zookeeper/zk"
) )
// Config is zookeeper config.
type Config struct { type Config struct {
// Endpoints is a list of URLs. Root string `json:"root"`
Endpoints []string `json:"endpoints"` Endpoints []string `json:"endpoints"`
Timeout xtime.Duration `json:"timeout"`
} }
var ( var (
@ -40,22 +45,10 @@ func Build(c *Config, id string) naming.Resolver {
return Builder(c).Build(id) return Builder(c).Build(id)
} }
// ZookeeperBuilder is a zookeeper client Builder
type ZookeeperBuilder struct {
cli *zk.Conn
connEvent <-chan zk.Event
ctx context.Context
cancelFunc context.CancelFunc
mutex sync.RWMutex
apps map[string]*appInfo
registry map[string]struct{}
}
type appInfo struct { type appInfo struct {
resolver map[*Resolve]struct{} resolver map[*Resolve]struct{}
ins atomic.Value ins atomic.Value
zkb *ZookeeperBuilder zkb *Zookeeper
once sync.Once once sync.Once
} }
@ -63,28 +56,43 @@ type appInfo struct {
type Resolve struct { type Resolve struct {
id string id string
event chan struct{} event chan struct{}
zkb *ZookeeperBuilder zkb *Zookeeper
}
// Zookeeper is a zookeeper client Builder.
type Zookeeper struct {
c *Config
cli *zk.Conn
connEvent <-chan zk.Event
ctx context.Context
cancelFunc context.CancelFunc
mutex sync.RWMutex
apps map[string]*appInfo
registry map[string]struct{}
} }
// New is new a zookeeper builder // New is new a zookeeper builder.
func New(c *Config) (zkb *ZookeeperBuilder, err error) { func New(c *Config) (zkb *Zookeeper, err error) {
//example: endpointSli = []string{"192.168.1.78:2181", "192.168.1.79:2181", "192.168.1.80:2181"} if c.Timeout == 0 {
c.Timeout = xtime.Duration(time.Second)
}
if len(c.Endpoints) == 0 { if len(c.Endpoints) == 0 {
errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null") errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null")
log.Error(errInfo) log.Error(errInfo)
return nil, errors.New(errInfo) return nil, errors.New(errInfo)
} }
zkConn, connEvent, err := zk.Connect(c.Endpoints, 5*time.Second) zkConn, connEvent, err := zk.Connect(c.Endpoints, time.Duration(c.Timeout))
if err != nil { if err != nil {
log.Error(fmt.Sprintf("zk Connect err:(%v)", err)) log.Error(fmt.Sprintf("zk Connect err:(%v)", err))
return return
} else {
log.Info(fmt.Sprintf("zk Connect ok!"))
} }
log.Info(fmt.Sprintf("zk Connect ok!"))
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
zkb = &ZookeeperBuilder{ zkb = &Zookeeper{
c: c,
cli: zkConn, cli: zkConn,
connEvent: connEvent, connEvent: connEvent,
ctx: ctx, ctx: ctx,
@ -96,7 +104,7 @@ func New(c *Config) (zkb *ZookeeperBuilder, err error) {
} }
// Build zookeeper resovler builder. // Build zookeeper resovler builder.
func (z *ZookeeperBuilder) Build(appid string) naming.Resolver { func (z *Zookeeper) Build(appid string) naming.Resolver {
r := &Resolve{ r := &Resolve{
id: appid, id: appid,
zkb: z, zkb: z,
@ -119,21 +127,19 @@ func (z *ZookeeperBuilder) Build(appid string) naming.Resolver {
default: default:
} }
} }
app.once.Do(func() { app.once.Do(func() {
go app.watch(appid) go app.watch(appid)
log.Info("zookeeper: AddWatch(%s) already watch(%v)", appid, ok)
}) })
return r return r
} }
// Scheme return zookeeper's scheme // Scheme return zookeeper's scheme.
func (z *ZookeeperBuilder) Scheme() string { func (z *Zookeeper) Scheme() string {
return "zookeeper" return "zookeeper"
} }
// Register is register instance // Register is register instance.
func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { func (z *Zookeeper) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) {
z.mutex.Lock() z.mutex.Lock()
if _, ok := z.registry[ins.AppID]; ok { if _, ok := z.registry[ins.AppID]; ok {
err = ErrDuplication err = ErrDuplication
@ -157,16 +163,13 @@ func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) (
cancel() cancel()
<-ch <-ch
}) })
go func() { go func() {
for { for {
select { select {
case connEvent := <-z.connEvent: case connEvent := <-z.connEvent:
log.Warn("watch zkClient state, connEvent:(%v)", connEvent) log.Info("watch zkClient state, connEvent:(%+v)", connEvent)
if connEvent.State == zk.StateHasSession { if connEvent.State == zk.StateHasSession {
log.Warn("watch zkClient state, state is StateHasSession...") if err = z.register(ctx, ins); err != nil {
err = z.register(ctx, ins)
if err != nil {
log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err)) log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err))
continue continue
} }
@ -180,110 +183,120 @@ func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) (
return return
} }
func (z *ZookeeperBuilder) registerPerServer(name string) (err error) { func (z *Zookeeper) createPath(paths string) error {
var ( var (
str string lastPath = "/"
seps = strings.Split(paths, "/")
) )
for _, part := range seps {
str, err = z.cli.Create(name, nil, 0, zk.WorldACL(zk.PermAll)) if part == "" {
continue
}
lastPath = path.Join(lastPath, part)
ok, _, err := z.cli.Exists(lastPath)
if err != nil { if err != nil {
log.Warn(fmt.Sprintf("registerPerServer, fail to Create node:(%s). err:(%v)", name, err)) return err
} else {
log.Info(fmt.Sprintf("registerPerServer, succeed to Create node:(%s). retStr:(%s)", name, str))
} }
if ok {
return continue
}
ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll))
if err != nil {
log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err))
return err
}
log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret))
}
return nil
} }
func (z *ZookeeperBuilder) registerEphServer(name, host string, ins *naming.Instance) (err error) { func (z *Zookeeper) registerPeerServer(name, host string, ins *naming.Instance) (err error) {
var ( var (
str string str string
) )
val, err := json.Marshal(ins)
val, _ := json.Marshal(ins) if err != nil {
log.Info(fmt.Sprintf("registerEphServer, ins after json.Marshal:(%v)", string(val))) return
}
log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val)))
str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
if err != nil { if err != nil {
log.Warn(fmt.Sprintf("registerEphServer, fail to Create node:%s. err:(%v)", name+host, err)) log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", name+host, err))
} else { } else {
log.Info(fmt.Sprintf("registerEphServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", name+host, str))
} }
return return
} }
// register 注册zookeeper节点 // register is register instance to zookeeper.
func (z *ZookeeperBuilder) register(ctx context.Context, ins *naming.Instance) (err error) { func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err error) {
log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs) log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs)
prefix := z.keyPrefix(ins)
err = z.registerPerServer(prefix) prefix := z.keyPrefix(ins.AppID)
if err != nil { if err = z.createPath(prefix); err != nil {
log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err)) log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err))
} }
for _, addr := range ins.Addrs {
for _, val := range ins.Addrs { addr = strings.Replace(addr, "://", ":", 1)
err = z.registerEphServer(prefix, "/"+val, ins) if err = z.registerPeerServer(prefix, "/"+addr, ins); err != nil {
if err != nil { log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err))
log.Warn(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err))
} else { } else {
log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node."))
} }
} }
return nil return nil
} }
// unregister 删除zookeeper中节点信息 func (z *Zookeeper) unregister(ins *naming.Instance) (err error) {
func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) {
log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs)
prefix := z.keyPrefix(ins) prefix := z.keyPrefix(ins.AppID)
for _, addr := range ins.Addrs {
for _, val := range ins.Addrs { addr = strings.Replace(addr, ":", "://", 1)
strNode := prefix + "/" + val strNode := prefix + "/" + addr
exists, _, err := z.cli.Exists(strNode) exists, _, err := z.cli.Exists(strNode)
if err != nil { if err != nil {
log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) log.Error("zk.Conn.Exists node:(%v), error:(%v)", strNode, err)
return err continue
} }
if exists { if exists {
_, s, err := z.cli.Get(strNode) _, s, err := z.cli.Get(strNode)
if err != nil { if err != nil {
log.Error("zk.Conn.Get node:(%s), error:(%s)", strNode, err.Error()) log.Error("zk.Conn.Get node:(%s), error:(%v)", strNode, err)
return err continue
}
if err = z.cli.Delete(strNode, s.Version); err != nil {
log.Error("zk.Conn.Delete node:(%s), error:(%v)", strNode, err)
continue
} }
return z.cli.Delete(strNode, s.Version)
} }
log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname))
} }
return return
} }
func (z *ZookeeperBuilder) keyPrefix(ins *naming.Instance) string { func (z *Zookeeper) keyPrefix(appID string) string {
return fmt.Sprintf("/%s", ins.AppID) return path.Join(z.c.Root, appID)
} }
// Close stop all running process including zk fetch and register // Close stop all running process including zk fetch and register.
func (z *ZookeeperBuilder) Close() error { func (z *Zookeeper) Close() error {
z.cancelFunc() z.cancelFunc()
return nil return nil
} }
func (a *appInfo) watch(appID string) { func (a *appInfo) watch(appID string) {
_ = a.fetchstore(appID) _ = a.fetchstore(appID)
prefix := fmt.Sprintf("/%s", appID) prefix := a.zkb.keyPrefix(appID)
go func() { go func() {
for { for {
log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix))
snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix)
if err != nil { if err != nil {
log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err)
time.Sleep(time.Second)
continue continue
} }
log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot)) log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot))
for ev := range event { for ev := range event {
log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type))
@ -296,28 +309,24 @@ func (a *appInfo) watch(appID string) {
} }
func (a *appInfo) fetchstore(appID string) (err error) { func (a *appInfo) fetchstore(appID string) (err error) {
prefix := fmt.Sprintf("/%s", appID) prefix := a.zkb.keyPrefix(appID)
strNode := ""
childs, _, err := a.zkb.cli.Children(prefix) childs, _, err := a.zkb.cli.Children(prefix)
if err != nil { if err != nil {
log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), err:(%v)", prefix, err)) log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err))
} else { } else {
log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs))
} }
ins := &naming.InstancesInfo{ ins := &naming.InstancesInfo{
Instances: make(map[string][]*naming.Instance, 0), Instances: make(map[string][]*naming.Instance, 0),
} }
strNode := ""
//for range childs
for _, child := range childs { for _, child := range childs {
strNode = prefix + "/" + child strNode = prefix + "/" + child
resp, _, err := a.zkb.cli.Get(strNode) resp, _, err := a.zkb.cli.Get(strNode)
if err != nil { if err != nil {
log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err) log.Error("zookeeper: fetch client.Get(%s) error:(%v)", strNode, err)
return err return err
} }
in := new(naming.Instance) in := new(naming.Instance)
err = json.Unmarshal(resp, in) err = json.Unmarshal(resp, in)
if err != nil { if err != nil {
@ -327,12 +336,10 @@ func (a *appInfo) fetchstore(appID string) (err error) {
} }
a.store(ins) a.store(ins)
return nil return nil
} }
func (a *appInfo) store(ins *naming.InstancesInfo) { func (a *appInfo) store(ins *naming.InstancesInfo) {
a.ins.Store(ins) a.ins.Store(ins)
a.zkb.mutex.RLock() a.zkb.mutex.RLock()
for rs := range a.resolver { for rs := range a.resolver {

@ -0,0 +1,50 @@
package zookeeper
import (
"context"
"testing"
"time"
"github.com/bilibili/kratos/pkg/naming"
)
var (
_testAppid = "test_appid"
_testConf = &Config{
Root: "/test",
Endpoints: []string{"127.0.0.1:2181"},
}
_testIns = &naming.Instance{
AppID: _testAppid,
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{
"test_key": "test_value",
},
}
)
func TestZookeeper(t *testing.T) {
zk, err := New(_testConf)
if err != nil {
t.Fatal(err)
}
res := zk.Build(_testAppid)
go func() {
for event := range res.Watch() {
t.Log(event)
}
}()
_, err = zk.Register(context.TODO(), _testIns)
if err != nil {
t.Fatal(err)
}
time.Sleep(time.Second)
in, ok := res.Fetch(context.TODO())
if !ok {
t.Fatal("failed to resolver fetch")
}
if len(in.Instances) != 1 {
t.Fatalf("Instances not match, got:%d want:1", len(in.Instances))
}
}
Loading…
Cancel
Save