From 264e0d6c06b369645c84a2ee8b176987fb1aad63 Mon Sep 17 00:00:00 2001 From: Tony Date: Tue, 5 Nov 2019 22:55:44 +0800 Subject: [PATCH] fix empty path --- pkg/naming/zookeeper/zookeeper.go | 54 +++++++++++++++++--------- pkg/naming/zookeeper/zookeeper_test.go | 2 +- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/pkg/naming/zookeeper/zookeeper.go b/pkg/naming/zookeeper/zookeeper.go index 92662a058..cbcffa3e0 100644 --- a/pkg/naming/zookeeper/zookeeper.go +++ b/pkg/naming/zookeeper/zookeeper.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "net/url" "path" "strings" "sync" @@ -203,14 +204,14 @@ func (z *Zookeeper) createPath(paths string) error { ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll)) if err != nil { log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err)) - return err + } else { + log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) } - log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) } return nil } -func (z *Zookeeper) registerPeerServer(name, host string, ins *naming.Instance) (err error) { +func (z *Zookeeper) registerPeerServer(nodePath string, ins *naming.Instance) (err error) { var ( str string ) @@ -219,11 +220,18 @@ func (z *Zookeeper) registerPeerServer(name, host string, ins *naming.Instance) return } log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val))) - str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + ok, _, err := z.cli.Exists(nodePath) if err != nil { - log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", name+host, err)) + return err + } + if ok { + return nil + } + str, err = z.cli.Create(nodePath, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) + if err != nil { + log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", nodePath, err)) } else { - log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) + log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", nodePath, str)) } return } @@ -237,8 +245,12 @@ func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err err log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err)) } for _, addr := range ins.Addrs { - addr = strings.Replace(addr, "://", ":", 1) - if err = z.registerPeerServer(prefix, "/"+addr, ins); err != nil { + u, err := url.Parse(addr) + if err != nil { + continue + } + nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] + if err = z.registerPeerServer(nodePath, ins); err != nil { log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) } else { log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) @@ -251,26 +263,29 @@ func (z *Zookeeper) unregister(ins *naming.Instance) (err error) { log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) prefix := z.keyPrefix(ins.AppID) for _, addr := range ins.Addrs { - addr = strings.Replace(addr, ":", "://", 1) - strNode := prefix + "/" + addr - exists, _, err := z.cli.Exists(strNode) + u, err := url.Parse(addr) + if err != nil { + continue + } + nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] + exists, _, err := z.cli.Exists(nodePath) if err != nil { - log.Error("zk.Conn.Exists node:(%v), error:(%v)", strNode, err) + log.Error("zk.Conn.Exists node:(%v), error:(%v)", nodePath, err) continue } if exists { - _, s, err := z.cli.Get(strNode) + _, s, err := z.cli.Get(nodePath) if err != nil { - log.Error("zk.Conn.Get node:(%s), error:(%v)", strNode, err) + log.Error("zk.Conn.Get node:(%s), error:(%v)", nodePath, err) continue } - if err = z.cli.Delete(strNode, s.Version); err != nil { - log.Error("zk.Conn.Delete node:(%s), error:(%v)", strNode, err) + if err = z.cli.Delete(nodePath, s.Version); err != nil { + log.Error("zk.Conn.Delete node:(%s), error:(%v)", nodePath, err) continue } } - log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) + log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", nodePath, ins.AppID, ins.Hostname)) } return } @@ -287,8 +302,8 @@ func (z *Zookeeper) Close() error { func (a *appInfo) watch(appID string) { _ = a.fetchstore(appID) - prefix := a.zkb.keyPrefix(appID) go func() { + prefix := a.zkb.keyPrefix(appID) for { log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) @@ -297,7 +312,7 @@ func (a *appInfo) watch(appID string) { time.Sleep(time.Second) continue } - log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot)) + log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot)) for ev := range event { log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) if ev.Type == zk.EventNodeChildrenChanged { @@ -310,6 +325,7 @@ func (a *appInfo) watch(appID string) { func (a *appInfo) fetchstore(appID string) (err error) { prefix := a.zkb.keyPrefix(appID) + a.zkb.createPath(prefix) childs, _, err := a.zkb.cli.Children(prefix) if err != nil { log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) diff --git a/pkg/naming/zookeeper/zookeeper_test.go b/pkg/naming/zookeeper/zookeeper_test.go index d958527af..5b5cc0ec0 100644 --- a/pkg/naming/zookeeper/zookeeper_test.go +++ b/pkg/naming/zookeeper/zookeeper_test.go @@ -39,7 +39,7 @@ func TestZookeeper(t *testing.T) { if err != nil { t.Fatal(err) } - time.Sleep(time.Second) + time.Sleep(time.Second * 3) in, ok := res.Fetch(context.TODO()) if !ok { t.Fatal("failed to resolver fetch")