diff --git a/.travis.yml b/.travis.yml index b9bfc6413..3d041a9b6 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,12 +21,19 @@ env: - DISCOVERY_NODES=127.0.0.1:7171 - HTTP_PERF=tcp://0.0.0.0:0 - DOCKER_COMPOSE_VERSION=1.24.1 + - ZK_VERSION=3.5.6 before_install: + # docker-compose - sudo rm /usr/local/bin/docker-compose - curl -L https://github.com/docker/compose/releases/download/${DOCKER_COMPOSE_VERSION}/docker-compose-`uname -s`-`uname -m` > docker-compose - chmod +x docker-compose - sudo mv docker-compose /usr/local/bin + # zookeeper + - wget "http://apache.cs.utah.edu/zookeeper/zookeeper-${ZK_VERSION}/apache-zookeeper-${ZK_VERSION}-bin.tar.gz" + - tar -xvf "apache-zookeeper-${ZK_VERSION}-bin.tar.gz" + - mv apache-zookeeper-${ZK_VERSION}-bin zk + - chmod +x ./zk/bin/zkServer.sh # Skip the install step. Don't `go get` dependencies. Only build with the code # in vendor/ @@ -37,9 +44,12 @@ install: true # Make sure golangci-lint is vendored. before_script: - curl -sfL https://raw.githubusercontent.com/golangci/golangci-lint/master/install.sh| sh -s -- -b $GOPATH/bin + # discovery - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/install.sh | sh -s -- -b $GOPATH/bin - curl -sfL https://raw.githubusercontent.com/bilibili/discovery/master/cmd/discovery/discovery-example.toml -o $GOPATH/bin/discovery.toml - nohup bash -c "$GOPATH/bin/discovery -conf $GOPATH/bin/discovery.toml &" + # zookeeper + - sudo ./zk/bin/zkServer.sh start ./zk/conf/zoo_sample.cfg 1> /dev/null script: - go build ./... diff --git a/go.mod b/go.mod index f6c991787..2545b49b5 100644 --- a/go.mod +++ b/go.mod @@ -18,6 +18,7 @@ require ( github.com/go-playground/locales v0.12.1 // indirect github.com/go-playground/universal-translator v0.16.0 // indirect github.com/go-sql-driver/mysql v1.4.1 + github.com/go-zookeeper/zk v1.0.1 github.com/gobuffalo/packr/v2 v2.7.1 github.com/gogo/protobuf v1.3.0 github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect @@ -28,6 +29,7 @@ require ( github.com/leodido/go-urn v1.1.0 // indirect github.com/mattn/go-colorable v0.1.4 // indirect github.com/mattn/go-isatty v0.0.10 // indirect + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/montanaflynn/stats v0.5.0 github.com/openzipkin/zipkin-go v0.2.1 github.com/otokaze/mock v0.0.0-20190125081256-8282b7a7c7c3 diff --git a/go.sum b/go.sum index 67c9d0247..254dac4da 100644 --- a/go.sum +++ b/go.sum @@ -80,6 +80,8 @@ github.com/go-playground/universal-translator v0.16.0/go.mod h1:1AnU7NaIRDWWzGEK github.com/go-sql-driver/mysql v1.4.1 h1:g24URVg0OFbNUTx9qqY1IRZ9D9z3iPyi5zKhQZpNwpA= github.com/go-sql-driver/mysql v1.4.1/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= +github.com/go-zookeeper/zk v1.0.1 h1:LmXNmSnkNsNKai+aDu6sHRr8ZJzIrHJo8z8Z4sm8cT8= +github.com/go-zookeeper/zk v1.0.1/go.mod h1:gpJdHazfkmlg4V0rt0vYeHYJHSL8hHFwV0qOd+HRTJE= github.com/gobuffalo/envy v1.7.0/go.mod h1:n7DRkBerg/aorDM8kbduw5dN3oXGswK5liaSCx4T5NI= github.com/gobuffalo/envy v1.7.1 h1:OQl5ys5MBea7OGCdvPbBJWRgnhC/fGona6QKfvFeau8= github.com/gobuffalo/envy v1.7.1/go.mod h1:FurDp9+EDPE4aIUS3ZLyD+7/9fpx7YRt/ukY6jIHf0w= diff --git a/pkg/conf/paladin/apollo/apollo_test.go b/pkg/conf/paladin/apollo/apollo_test.go index cef79cb10..bb97f5d22 100644 --- a/pkg/conf/paladin/apollo/apollo_test.go +++ b/pkg/conf/paladin/apollo/apollo_test.go @@ -43,7 +43,7 @@ func TestApollo(t *testing.T) { os.Setenv("APOLLO_APP_ID", "SampleApp") os.Setenv("APOLLO_CLUSTER", "default") os.Setenv("APOLLO_CACHE_DIR", "/tmp") - os.Setenv("APOLLO_META_ADDR", "localhost:8080") + os.Setenv("APOLLO_META_ADDR", "localhost:8010") os.Setenv("APOLLO_NAMESPACES", fmt.Sprintf("%s,%s", testAppYAML, testClientJSON)) mockserver.Set(testAppYAML, "content", testAppYAMLContent1) mockserver.Set(testClientJSON, "content", testClientJSONContent) diff --git a/pkg/conf/paladin/apollo/internal/mockserver/mockserver.go b/pkg/conf/paladin/apollo/internal/mockserver/mockserver.go index 5ed0bc2ba..cca5b61ab 100644 --- a/pkg/conf/paladin/apollo/internal/mockserver/mockserver.go +++ b/pkg/conf/paladin/apollo/internal/mockserver/mockserver.go @@ -137,7 +137,7 @@ func initServer() { mux.Handle("/notifications/", http.HandlerFunc(server.NotificationHandler)) mux.Handle("/configs/", http.HandlerFunc(server.ConfigHandler)) server.server.Handler = mux - server.server.Addr = ":8080" + server.server.Addr = ":8010" } // Close mock server diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go new file mode 100644 index 000000000..b6221d052 --- /dev/null +++ b/pkg/naming/zookeeper/zookeeper.go @@ -0,0 +1,396 @@ +package zookeeper + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/url" + "path" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/bilibili/kratos/pkg/log" + "github.com/bilibili/kratos/pkg/naming" + xtime "github.com/bilibili/kratos/pkg/time" + "github.com/go-zookeeper/zk" +) + +// Config is zookeeper config. +type Config struct { + Root string `json:"root"` + Endpoints []string `json:"endpoints"` + Timeout xtime.Duration `json:"timeout"` +} + +var ( + _once sync.Once + _builder naming.Builder + + // ErrDuplication is a register duplication err + ErrDuplication = errors.New("zookeeper: instance duplicate registration") +) + +// Builder return default zookeeper resolver builder. +func Builder(c *Config) naming.Builder { + _once.Do(func() { + _builder, _ = New(c) + }) + return _builder +} + +// Build register resolver into default zookeeper. +func Build(c *Config, id string) naming.Resolver { + return Builder(c).Build(id) +} + +type appInfo struct { + resolver map[*Resolve]struct{} + ins atomic.Value + zkb *Zookeeper + once sync.Once +} + +// Resolve zookeeper resolver. +type Resolve struct { + id string + event chan struct{} + zkb *Zookeeper +} + +// Zookeeper is a zookeeper client Builder. +// path: /{root}/{appid}/{ip} -> json(instance) +type Zookeeper struct { + c *Config + cli *zk.Conn + connEvent <-chan zk.Event + ctx context.Context + cancelFunc context.CancelFunc + + mutex sync.RWMutex + apps map[string]*appInfo + registry map[string]struct{} +} + +// New is new a zookeeper builder. +func New(c *Config) (zkb *Zookeeper, err error) { + if c.Timeout == 0 { + c.Timeout = xtime.Duration(time.Second) + } + if len(c.Endpoints) == 0 { + errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null") + log.Error(errInfo) + return nil, errors.New(errInfo) + } + + zkConn, connEvent, err := zk.Connect(c.Endpoints, time.Duration(c.Timeout)) + if err != nil { + log.Error(fmt.Sprintf("zk Connect err:(%v)", err)) + return + } + log.Info(fmt.Sprintf("zk Connect ok!")) + + ctx, cancel := context.WithCancel(context.Background()) + zkb = &Zookeeper{ + c: c, + cli: zkConn, + connEvent: connEvent, + ctx: ctx, + cancelFunc: cancel, + apps: map[string]*appInfo{}, + registry: map[string]struct{}{}, + } + return +} + +// Build zookeeper resovler builder. +func (z *Zookeeper) Build(appid string) naming.Resolver { + r := &Resolve{ + id: appid, + zkb: z, + event: make(chan struct{}, 1), + } + z.mutex.Lock() + app, ok := z.apps[appid] + if !ok { + app = &appInfo{ + resolver: make(map[*Resolve]struct{}), + zkb: z, + } + z.apps[appid] = app + } + app.resolver[r] = struct{}{} + z.mutex.Unlock() + if ok { + select { + case r.event <- struct{}{}: + default: + } + } + app.once.Do(func() { + go app.watch(appid) + }) + return r +} + +// Scheme return zookeeper's scheme. +func (z *Zookeeper) Scheme() string { + return "zookeeper" +} + +// Register is register instance. +func (z *Zookeeper) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { + z.mutex.Lock() + if _, ok := z.registry[ins.AppID]; ok { + err = ErrDuplication + } else { + z.registry[ins.AppID] = struct{}{} + } + z.mutex.Unlock() + if err != nil { + return + } + ctx, cancel := context.WithCancel(z.ctx) + if err = z.register(ctx, ins); err != nil { + z.mutex.Lock() + delete(z.registry, ins.AppID) + z.mutex.Unlock() + cancel() + return + } + ch := make(chan struct{}, 1) + cancelFunc = context.CancelFunc(func() { + cancel() + <-ch + }) + go func() { + for { + select { + case connEvent := <-z.connEvent: + log.Info("watch zkClient state, connEvent:(%+v)", connEvent) + if connEvent.State == zk.StateHasSession { + if err = z.register(ctx, ins); err != nil { + log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err)) + continue + } + } + case <-ctx.Done(): + ch <- struct{}{} + return + } + } + }() + return +} + +func (z *Zookeeper) createPath(paths string) error { + var ( + lastPath = "/" + seps = strings.Split(paths, "/") + ) + for _, part := range seps { + if part == "" { + continue + } + lastPath = path.Join(lastPath, part) + ok, _, err := z.cli.Exists(lastPath) + if err != nil { + return err + } + if ok { + continue + } + ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err)) + } else { + log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) + } + } + return nil +} + +func (z *Zookeeper) registerPeerServer(nodePath string, ins *naming.Instance) (err error) { + var ( + str string + ) + val, err := json.Marshal(ins) + if err != nil { + return + } + log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val))) + ok, _, err := z.cli.Exists(nodePath) + if err != nil { + return err + } + if ok { + return nil + } + str, err = z.cli.Create(nodePath, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", nodePath, err)) + } else { + log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", nodePath, str)) + } + return +} + +// register is register instance to zookeeper. +func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err error) { + log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs) + + prefix := z.keyPrefix(ins.AppID) + if err = z.createPath(prefix); err != nil { + log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err)) + } + for _, addr := range ins.Addrs { + u, err := url.Parse(addr) + if err != nil { + continue + } + // grpc://127.0.0.1:8000 to 127.0.0.1 + nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] + if err = z.registerPeerServer(nodePath, ins); err != nil { + log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) + } else { + log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) + } + } + return nil +} + +func (z *Zookeeper) unregister(ins *naming.Instance) (err error) { + log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) + prefix := z.keyPrefix(ins.AppID) + for _, addr := range ins.Addrs { + u, err := url.Parse(addr) + if err != nil { + continue + } + // grpc://127.0.0.1:8000 to 127.0.0.1 + nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] + exists, _, err := z.cli.Exists(nodePath) + if err != nil { + log.Error("zk.Conn.Exists node:(%v), error:(%v)", nodePath, err) + continue + } + if exists { + _, s, err := z.cli.Get(nodePath) + if err != nil { + log.Error("zk.Conn.Get node:(%s), error:(%v)", nodePath, err) + continue + } + if err = z.cli.Delete(nodePath, s.Version); err != nil { + log.Error("zk.Conn.Delete node:(%s), error:(%v)", nodePath, err) + continue + } + } + + log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", nodePath, ins.AppID, ins.Hostname)) + } + return +} + +func (z *Zookeeper) keyPrefix(appID string) string { + return path.Join(z.c.Root, appID) +} + +// Close stop all running process including zk fetch and register. +func (z *Zookeeper) Close() error { + z.cancelFunc() + return nil +} + +func (a *appInfo) watch(appID string) { + _ = a.fetchstore(appID) + go func() { + prefix := a.zkb.keyPrefix(appID) + for { + log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) + snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) + if err != nil { + log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err) + time.Sleep(time.Second) + _ = a.fetchstore(appID) + continue + } + log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot)) + for ev := range event { + log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) + if ev.Type == zk.EventNodeChildrenChanged { + _ = a.fetchstore(appID) + } + } + } + }() +} + +func (a *appInfo) fetchstore(appID string) (err error) { + prefix := a.zkb.keyPrefix(appID) + childs, _, err := a.zkb.cli.Children(prefix) + if err != nil { + log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) + return + } + log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) + ins := &naming.InstancesInfo{ + Instances: make(map[string][]*naming.Instance, 0), + } + for _, child := range childs { + nodePath := prefix + "/" + child + resp, _, err := a.zkb.cli.Get(nodePath) + if err != nil { + log.Error("zookeeper: fetch client.Get(%s) error:(%v)", nodePath, err) + return err + } + in := new(naming.Instance) + if err = json.Unmarshal(resp, in); err != nil { + return err + } + ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in) + + } + a.store(ins) + return nil +} + +func (a *appInfo) store(ins *naming.InstancesInfo) { + a.ins.Store(ins) + a.zkb.mutex.RLock() + for rs := range a.resolver { + select { + case rs.event <- struct{}{}: + default: + } + } + a.zkb.mutex.RUnlock() +} + +// Watch watch instance. +func (r *Resolve) Watch() <-chan struct{} { + return r.event +} + +// Fetch fetch resolver instance. +func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) { + r.zkb.mutex.RLock() + app, ok := r.zkb.apps[r.id] + r.zkb.mutex.RUnlock() + if ok { + ins, ok = app.ins.Load().(*naming.InstancesInfo) + return + } + return +} + +// Close close resolver. +func (r *Resolve) Close() error { + r.zkb.mutex.Lock() + if app, ok := r.zkb.apps[r.id]; ok && len(app.resolver) != 0 { + delete(app.resolver, r) + } + r.zkb.mutex.Unlock() + return nil +} diff --git a/pkg/naming/zookeeper/zookeeper_test.go b/pkg/naming/zookeeper/zookeeper_test.go new file mode 100644 index 000000000..0539d8f2a --- /dev/null +++ b/pkg/naming/zookeeper/zookeeper_test.go @@ -0,0 +1,46 @@ +package zookeeper + +import ( + "context" + "testing" + + "github.com/bilibili/kratos/pkg/naming" +) + +var ( + _testAppid = "test_appid" + + _testConf = &Config{ + Root: "/test", + Endpoints: []string{"127.0.0.1:2181"}, + } + _testIns = &naming.Instance{ + AppID: _testAppid, + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{ + "test_key": "test_value", + }, + } +) + +func TestZookeeper(t *testing.T) { + zk, err := New(_testConf) + if err != nil { + t.Fatal(err) + } + _, err = zk.Register(context.TODO(), _testIns) + if err != nil { + t.Fatal(err) + } + // fetch&watch + res := zk.Build(_testAppid) + event := res.Watch() + <-event + in, ok := res.Fetch(context.TODO()) + if !ok { + t.Fatal("failed to resolver fetch") + } + if len(in.Instances) != 1 { + t.Fatalf("Instances not match, got:%d want:1", len(in.Instances)) + } +} diff --git a/pkg/net/rpc/warden/resolver/direct/test/direct_test.go b/pkg/net/rpc/warden/resolver/direct/test/direct_test.go index b898fd643..cc3b65112 100644 --- a/pkg/net/rpc/warden/resolver/direct/test/direct_test.go +++ b/pkg/net/rpc/warden/resolver/direct/test/direct_test.go @@ -42,8 +42,8 @@ func createServer(name, listen string) *warden.Server { func TestMain(m *testing.M) { resolver.Register(direct.New()) ctx := context.TODO() - s1 := createServer("server1", "127.0.0.1:18081") - s2 := createServer("server2", "127.0.0.1:18082") + s1 := createServer("server1", "127.0.0.1:18001") + s2 := createServer("server2", "127.0.0.1:18002") defer s1.Shutdown(ctx) defer s2.Shutdown(ctx) os.Exit(m.Run()) @@ -68,7 +68,7 @@ func createTestClient(t *testing.T, connStr string) pb.GreeterClient { } func TestDirect(t *testing.T) { - cli := createTestClient(t, "direct://default/127.0.0.1:18083,127.0.0.1:18082") + cli := createTestClient(t, "direct://default/127.0.0.1:18003,127.0.0.1:18002") count := 0 for i := 0; i < 10; i++ { if resp, err := cli.SayHello(context.TODO(), &pb.HelloRequest{Age: 1, Name: "hello"}); err != nil { diff --git a/pkg/net/rpc/warden/server_test.go b/pkg/net/rpc/warden/server_test.go index 5d0ea2afa..18c508685 100644 --- a/pkg/net/rpc/warden/server_test.go +++ b/pkg/net/rpc/warden/server_test.go @@ -30,6 +30,8 @@ import ( const ( _separator = "\001" + + _testAddr = "127.0.0.1:9090" ) var ( @@ -156,7 +158,7 @@ func (s *helloServer) StreamHello(ss pb.Greeter_StreamHelloServer) error { func runServer(t *testing.T, interceptors ...grpc.UnaryServerInterceptor) func() { return func() { - server = NewServer(&ServerConfig{Addr: "127.0.0.1:8080", Timeout: xtime.Duration(time.Second)}) + server = NewServer(&ServerConfig{Addr: _testAddr, Timeout: xtime.Duration(time.Second)}) pb.RegisterGreeterServer(server.Server(), &helloServer{t}) server.Use( func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { @@ -180,7 +182,7 @@ func runServer(t *testing.T, interceptors ...grpc.UnaryServerInterceptor) func() func runClient(ctx context.Context, cc *ClientConfig, t *testing.T, name string, age int32, interceptors ...grpc.UnaryClientInterceptor) (resp *pb.HelloReply, err error) { client := NewClient(cc) client.Use(interceptors...) - conn, err := client.Dial(context.Background(), "127.0.0.1:8080") + conn, err := client.Dial(context.Background(), _testAddr) if err != nil { panic(fmt.Errorf("did not connect: %v,req: %v %v", err, name, age)) } @@ -226,7 +228,7 @@ func testTimeoutOpt(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) defer cancel() client := NewClient(&clientConfig) - conn, err := client.Dial(ctx, "127.0.0.1:8080") + conn, err := client.Dial(ctx, _testAddr) if err != nil { t.Fatalf("did not connect: %v", err) } @@ -284,7 +286,7 @@ func testAllErrorCase(t *testing.T) { func testBreaker(t *testing.T) { client := NewClient(&clientConfig) - conn, err := client.Dial(context.Background(), "127.0.0.1:8080") + conn, err := client.Dial(context.Background(), _testAddr) if err != nil { t.Fatalf("did not connect: %v", err) } @@ -378,7 +380,7 @@ func testClientRecovery(t *testing.T) { panic("client recovery test") }) - conn, err := client.Dial(ctx, "127.0.0.1:8080") + conn, err := client.Dial(ctx, _testAddr) if err != nil { t.Fatalf("did not connect: %v", err) } @@ -403,7 +405,7 @@ func testServerRecovery(t *testing.T) { ctx := context.Background() client := NewClient(&clientConfig) - conn, err := client.Dial(ctx, "127.0.0.1:8080") + conn, err := client.Dial(ctx, _testAddr) if err != nil { t.Fatalf("did not connect: %v", err) } @@ -491,7 +493,7 @@ func testTrace(t *testing.T, port int, isStream bool) { } func BenchmarkServer(b *testing.B) { - server := NewServer(&ServerConfig{Addr: "127.0.0.1:8080", Timeout: xtime.Duration(time.Second)}) + server := NewServer(&ServerConfig{Addr: _testAddr, Timeout: xtime.Duration(time.Second)}) go func() { pb.RegisterGreeterServer(server.Server(), &helloServer{}) if _, err := server.Start(); err != nil { @@ -503,7 +505,7 @@ func BenchmarkServer(b *testing.B) { server.Server().Stop() }() client := NewClient(&clientConfig) - conn, err := client.Dial(context.Background(), "127.0.0.1:8080") + conn, err := client.Dial(context.Background(), _testAddr) if err != nil { conn.Close() b.Fatalf("did not connect: %v", err) @@ -602,4 +604,3 @@ func TestStartWithAddr(t *testing.T) { assert.Nil(t, err) } } -