From 4b11f444192a4690a2b46f4e1c5fd6b0bff16937 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 23:16:37 +0800 Subject: [PATCH] fix watch event --- pkg/naming/zookeeper/zookeeper.go | 18 +++++++++--------- pkg/naming/zookeeper/zookeeper_test.go | 10 +++------- 2 files changed, 12 insertions(+), 16 deletions(-) diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go index cbcffa3e0..92a203ba0 100644 --- a/pkg/naming/zookeeper/zookeeper.go +++ b/pkg/naming/zookeeper/zookeeper.go @@ -249,6 +249,7 @@ func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err err if err != nil { continue } + // grpc://127.0.0.1:8000 to 127.0.0.1 nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] if err = z.registerPeerServer(nodePath, ins); err != nil { log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) @@ -267,6 +268,7 @@ func (z *Zookeeper) unregister(ins *naming.Instance) (err error) { if err != nil { continue } + // grpc://127.0.0.1:8000 to 127.0.0.1 nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] exists, _, err := z.cli.Exists(nodePath) if err != nil { @@ -310,6 +312,7 @@ func (a *appInfo) watch(appID string) { if err != nil { log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err) time.Sleep(time.Second) + _ = a.fetchstore(appID) continue } log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot)) @@ -325,27 +328,24 @@ func (a *appInfo) watch(appID string) { func (a *appInfo) fetchstore(appID string) (err error) { prefix := a.zkb.keyPrefix(appID) - a.zkb.createPath(prefix) childs, _, err := a.zkb.cli.Children(prefix) if err != nil { log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) - } else { - log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) + return } + log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) ins := &naming.InstancesInfo{ Instances: make(map[string][]*naming.Instance, 0), } - strNode := "" for _, child := range childs { - strNode = prefix + "/" + child - resp, _, err := a.zkb.cli.Get(strNode) + nodePath := prefix + "/" + child + resp, _, err := a.zkb.cli.Get(nodePath) if err != nil { - log.Error("zookeeper: fetch client.Get(%s) error:(%v)", strNode, err) + log.Error("zookeeper: fetch client.Get(%s) error:(%v)", nodePath, err) return err } in := new(naming.Instance) - err = json.Unmarshal(resp, in) - if err != nil { + if err = json.Unmarshal(resp, in); err != nil { return err } ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in) diff --git a/pkg/naming/zookeeper/zookeeper_test.go b/pkg/naming/zookeeper/zookeeper_test.go index 5b5cc0ec0..192bc1883 100644 --- a/pkg/naming/zookeeper/zookeeper_test.go +++ b/pkg/naming/zookeeper/zookeeper_test.go @@ -3,7 +3,6 @@ package zookeeper import ( "context" "testing" - "time" "github.com/bilibili/kratos/pkg/naming" ) @@ -30,16 +29,13 @@ func TestZookeeper(t *testing.T) { t.Fatal(err) } res := zk.Build(_testAppid) - go func() { - for event := range res.Watch() { - t.Log(event) - } - }() + event := res.Watch() _, err = zk.Register(context.TODO(), _testIns) if err != nil { t.Fatal(err) } - time.Sleep(time.Second * 3) + <-event + <-event in, ok := res.Fetch(context.TODO()) if !ok { t.Fatal("failed to resolver fetch")