zookeeper接入kratos 服务注册发现与负载均衡

1. 加入ctx.Done的情况防止groutine 泄露
2. 注册的协议,允许注册多种协议,具体选择哪种协议有client端选择
3. 去掉RootPath常量,避免歧义
pull/391/head
guobin 5 years ago
parent fa750fdff7
commit d9f57d5647
  1. 45
      pkg/naming/zookeeper/zookeeper.go

@ -5,7 +5,6 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"strings"
"sync" "sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -15,10 +14,6 @@ import (
"github.com/go-zookeeper/zk" "github.com/go-zookeeper/zk"
) )
const (
RootPath = "/"
)
type Config struct { type Config struct {
// Endpoints is a list of URLs. // Endpoints is a list of URLs.
Endpoints []string `json:"endpoints"` Endpoints []string `json:"endpoints"`
@ -176,7 +171,9 @@ func (z *ZookeeperBuilder) Register(ctx context.Context, ins *naming.Instance) (
continue continue
} }
} }
case <-time.After(time.Second): case <-ctx.Done():
ch <- struct{}{}
return
} }
} }
}() }()
@ -226,26 +223,14 @@ func (z *ZookeeperBuilder) register(ctx context.Context, ins *naming.Instance) (
log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err)) log.Warn(fmt.Sprintf("register, fail to registerPerServer node error:(%v)", err))
} }
var svrAddr string
for _, val := range ins.Addrs { for _, val := range ins.Addrs {
if strings.HasPrefix(val, "grpc://") { err = z.registerEphServer(prefix, "/"+val, ins)
svrAddr = strings.TrimPrefix(val, "grpc://")
break
}
}
if svrAddr == "" {
errInfo := fmt.Sprintf("register, error occur, grpc svrAddr is null")
log.Error(errInfo)
return errors.New(errInfo)
}
err = z.registerEphServer(prefix, RootPath+svrAddr, ins)
if err != nil { if err != nil {
log.Error(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err)) log.Warn(fmt.Sprintf("registerServer, fail to RegisterEphServer node error:(%v)", err))
//return
} else { } else {
log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node."))
} }
}
return nil return nil
} }
@ -255,20 +240,8 @@ func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) {
log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs)
prefix := z.keyPrefix(ins) prefix := z.keyPrefix(ins)
var svrAddr string
for _, val := range ins.Addrs { for _, val := range ins.Addrs {
if strings.HasPrefix(val, "grpc://") { strNode := prefix + "/" + val
svrAddr = strings.TrimPrefix(val, "grpc://")
break
}
}
if svrAddr == "" {
errInfo := fmt.Sprintf("unregister, error occur, grpc svrAddr is null")
log.Error(errInfo)
return errors.New(errInfo)
}
strNode := prefix + RootPath + svrAddr
exists, _, err := z.cli.Exists(strNode) exists, _, err := z.cli.Exists(strNode)
if err != nil { if err != nil {
log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error()) log.Error("zk.Conn.Exists node:(%v), error:(%s)", strNode, err.Error())
@ -284,6 +257,8 @@ func (z *ZookeeperBuilder) unregister(ins *naming.Instance) (err error) {
} }
log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname)) log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", strNode, ins.AppID, ins.Hostname))
}
return return
} }
@ -336,7 +311,7 @@ func (a *appInfo) fetchstore(appID string) (err error) {
//for range childs //for range childs
for _, child := range childs { for _, child := range childs {
strNode = prefix + RootPath + child strNode = prefix + "/" + child
resp, _, err := a.zkb.cli.Get(strNode) resp, _, err := a.zkb.cli.Get(strNode)
if err != nil { if err != nil {
log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err) log.Error("zookeeper: fetch client.Get(%s) error(%v)", strNode, err)

Loading…
Cancel
Save