|
|
@ -5,6 +5,7 @@ import ( |
|
|
|
"encoding/json" |
|
|
|
"encoding/json" |
|
|
|
"errors" |
|
|
|
"errors" |
|
|
|
"fmt" |
|
|
|
"fmt" |
|
|
|
|
|
|
|
"net/url" |
|
|
|
"path" |
|
|
|
"path" |
|
|
|
"strings" |
|
|
|
"strings" |
|
|
|
"sync" |
|
|
|
"sync" |
|
|
@ -203,14 +204,14 @@ func (z *Zookeeper) createPath(paths string) error { |
|
|
|
ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll)) |
|
|
|
ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll)) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err)) |
|
|
|
log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err)) |
|
|
|
return err |
|
|
|
} else { |
|
|
|
|
|
|
|
log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) |
|
|
|
} |
|
|
|
} |
|
|
|
log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) |
|
|
|
|
|
|
|
} |
|
|
|
} |
|
|
|
return nil |
|
|
|
return nil |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
func (z *Zookeeper) registerPeerServer(name, host string, ins *naming.Instance) (err error) { |
|
|
|
func (z *Zookeeper) registerPeerServer(nodePath string, ins *naming.Instance) (err error) { |
|
|
|
var ( |
|
|
|
var ( |
|
|
|
str string |
|
|
|
str string |
|
|
|
) |
|
|
|
) |
|
|
@ -219,11 +220,18 @@ func (z *Zookeeper) registerPeerServer(name, host string, ins *naming.Instance) |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
|
log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val))) |
|
|
|
log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val))) |
|
|
|
str, err = z.cli.Create(name+host, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) |
|
|
|
ok, _, err := z.cli.Exists(nodePath) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", name+host, err)) |
|
|
|
return err |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
if ok { |
|
|
|
|
|
|
|
return nil |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
str, err = z.cli.Create(nodePath, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) |
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
|
|
|
log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", nodePath, err)) |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", name+host, str)) |
|
|
|
log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", nodePath, str)) |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
@ -237,8 +245,12 @@ func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err err |
|
|
|
log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err)) |
|
|
|
log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err)) |
|
|
|
} |
|
|
|
} |
|
|
|
for _, addr := range ins.Addrs { |
|
|
|
for _, addr := range ins.Addrs { |
|
|
|
addr = strings.Replace(addr, "://", ":", 1) |
|
|
|
u, err := url.Parse(addr) |
|
|
|
if err = z.registerPeerServer(prefix, "/"+addr, ins); err != nil { |
|
|
|
if err != nil { |
|
|
|
|
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] |
|
|
|
|
|
|
|
if err = z.registerPeerServer(nodePath, ins); err != nil { |
|
|
|
log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) |
|
|
|
log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) |
|
|
|
} else { |
|
|
|
} else { |
|
|
|
log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) |
|
|
|
log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) |
|
|
@ -251,26 +263,29 @@ func (z *Zookeeper) unregister(ins *naming.Instance) (err error) { |
|
|
|
log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) |
|
|
|
log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) |
|
|
|
prefix := z.keyPrefix(ins.AppID) |
|
|
|
prefix := z.keyPrefix(ins.AppID) |
|
|
|
for _, addr := range ins.Addrs { |
|
|
|
for _, addr := range ins.Addrs { |
|
|
|
addr = strings.Replace(addr, ":", "://", 1) |
|
|
|
u, err := url.Parse(addr) |
|
|
|
strNode := prefix + "/" + addr |
|
|
|
if err != nil { |
|
|
|
exists, _, err := z.cli.Exists(strNode) |
|
|
|
continue |
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] |
|
|
|
|
|
|
|
exists, _, err := z.cli.Exists(nodePath) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
log.Error("zk.Conn.Exists node:(%v), error:(%v)", strNode, err) |
|
|
|
log.Error("zk.Conn.Exists node:(%v), error:(%v)", nodePath, err) |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
if exists { |
|
|
|
if exists { |
|
|
|
_, s, err := z.cli.Get(strNode) |
|
|
|
_, s, err := z.cli.Get(nodePath) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
log.Error("zk.Conn.Get node:(%s), error:(%v)", strNode, err) |
|
|
|
log.Error("zk.Conn.Get node:(%s), error:(%v)", nodePath, err) |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
if err = z.cli.Delete(strNode, s.Version); err != nil { |
|
|
|
if err = z.cli.Delete(nodePath, s.Version); err != nil { |
|
|
|
log.Error("zk.Conn.Delete node:(%s), error:(%v)", strNode, err) |
|
|
|
log.Error("zk.Conn.Delete node:(%s), error:(%v)", nodePath, err) |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
} |
|
|
|
|
|
|
|
|
|
|
|
log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) |
|
|
|
log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", nodePath, ins.AppID, ins.Hostname)) |
|
|
|
} |
|
|
|
} |
|
|
|
return |
|
|
|
return |
|
|
|
} |
|
|
|
} |
|
|
@ -287,8 +302,8 @@ func (z *Zookeeper) Close() error { |
|
|
|
|
|
|
|
|
|
|
|
func (a *appInfo) watch(appID string) { |
|
|
|
func (a *appInfo) watch(appID string) { |
|
|
|
_ = a.fetchstore(appID) |
|
|
|
_ = a.fetchstore(appID) |
|
|
|
prefix := a.zkb.keyPrefix(appID) |
|
|
|
|
|
|
|
go func() { |
|
|
|
go func() { |
|
|
|
|
|
|
|
prefix := a.zkb.keyPrefix(appID) |
|
|
|
for { |
|
|
|
for { |
|
|
|
log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) |
|
|
|
log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) |
|
|
|
snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) |
|
|
|
snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) |
|
|
@ -297,7 +312,7 @@ func (a *appInfo) watch(appID string) { |
|
|
|
time.Sleep(time.Second) |
|
|
|
time.Sleep(time.Second) |
|
|
|
continue |
|
|
|
continue |
|
|
|
} |
|
|
|
} |
|
|
|
log.Info(fmt.Sprintf("zk ChildrenW ok, snapshot:(%v)", snapshot)) |
|
|
|
log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot)) |
|
|
|
for ev := range event { |
|
|
|
for ev := range event { |
|
|
|
log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) |
|
|
|
log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) |
|
|
|
if ev.Type == zk.EventNodeChildrenChanged { |
|
|
|
if ev.Type == zk.EventNodeChildrenChanged { |
|
|
@ -310,6 +325,7 @@ func (a *appInfo) watch(appID string) { |
|
|
|
|
|
|
|
|
|
|
|
func (a *appInfo) fetchstore(appID string) (err error) { |
|
|
|
func (a *appInfo) fetchstore(appID string) (err error) { |
|
|
|
prefix := a.zkb.keyPrefix(appID) |
|
|
|
prefix := a.zkb.keyPrefix(appID) |
|
|
|
|
|
|
|
a.zkb.createPath(prefix) |
|
|
|
childs, _, err := a.zkb.cli.Children(prefix) |
|
|
|
childs, _, err := a.zkb.cli.Children(prefix) |
|
|
|
if err != nil { |
|
|
|
if err != nil { |
|
|
|
log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) |
|
|
|
log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) |
|
|
|