You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
395 lines
9.4 KiB
395 lines
9.4 KiB
package zookeeper
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"net/url"
|
|
"path"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/bilibili/kratos/pkg/log"
|
|
"github.com/bilibili/kratos/pkg/naming"
|
|
xtime "github.com/bilibili/kratos/pkg/time"
|
|
"github.com/go-zookeeper/zk"
|
|
)
|
|
|
|
// Config is zookeeper config.
|
|
type Config struct {
|
|
Root string `json:"root"`
|
|
Endpoints []string `json:"endpoints"`
|
|
Timeout xtime.Duration `json:"timeout"`
|
|
}
|
|
|
|
var (
|
|
_once sync.Once
|
|
_builder naming.Builder
|
|
|
|
// ErrDuplication is a register duplication err
|
|
ErrDuplication = errors.New("zookeeper: instance duplicate registration")
|
|
)
|
|
|
|
// Builder return default zookeeper resolver builder.
|
|
func Builder(c *Config) naming.Builder {
|
|
_once.Do(func() {
|
|
_builder, _ = New(c)
|
|
})
|
|
return _builder
|
|
}
|
|
|
|
// Build register resolver into default zookeeper.
|
|
func Build(c *Config, id string) naming.Resolver {
|
|
return Builder(c).Build(id)
|
|
}
|
|
|
|
type appInfo struct {
|
|
resolver map[*Resolve]struct{}
|
|
ins atomic.Value
|
|
zkb *Zookeeper
|
|
once sync.Once
|
|
}
|
|
|
|
// Resolve zookeeper resolver.
|
|
type Resolve struct {
|
|
id string
|
|
event chan struct{}
|
|
zkb *Zookeeper
|
|
}
|
|
|
|
// Zookeeper is a zookeeper client Builder.
|
|
type Zookeeper struct {
|
|
c *Config
|
|
cli *zk.Conn
|
|
connEvent <-chan zk.Event
|
|
ctx context.Context
|
|
cancelFunc context.CancelFunc
|
|
|
|
mutex sync.RWMutex
|
|
apps map[string]*appInfo
|
|
registry map[string]struct{}
|
|
}
|
|
|
|
// New is new a zookeeper builder.
|
|
func New(c *Config) (zkb *Zookeeper, err error) {
|
|
if c.Timeout == 0 {
|
|
c.Timeout = xtime.Duration(time.Second)
|
|
}
|
|
if len(c.Endpoints) == 0 {
|
|
errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null")
|
|
log.Error(errInfo)
|
|
return nil, errors.New(errInfo)
|
|
}
|
|
|
|
zkConn, connEvent, err := zk.Connect(c.Endpoints, time.Duration(c.Timeout))
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("zk Connect err:(%v)", err))
|
|
return
|
|
}
|
|
log.Info(fmt.Sprintf("zk Connect ok!"))
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
zkb = &Zookeeper{
|
|
c: c,
|
|
cli: zkConn,
|
|
connEvent: connEvent,
|
|
ctx: ctx,
|
|
cancelFunc: cancel,
|
|
apps: map[string]*appInfo{},
|
|
registry: map[string]struct{}{},
|
|
}
|
|
return
|
|
}
|
|
|
|
// Build zookeeper resovler builder.
|
|
func (z *Zookeeper) Build(appid string) naming.Resolver {
|
|
r := &Resolve{
|
|
id: appid,
|
|
zkb: z,
|
|
event: make(chan struct{}, 1),
|
|
}
|
|
z.mutex.Lock()
|
|
app, ok := z.apps[appid]
|
|
if !ok {
|
|
app = &appInfo{
|
|
resolver: make(map[*Resolve]struct{}),
|
|
zkb: z,
|
|
}
|
|
z.apps[appid] = app
|
|
}
|
|
app.resolver[r] = struct{}{}
|
|
z.mutex.Unlock()
|
|
if ok {
|
|
select {
|
|
case r.event <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
app.once.Do(func() {
|
|
go app.watch(appid)
|
|
})
|
|
return r
|
|
}
|
|
|
|
// Scheme return zookeeper's scheme.
|
|
func (z *Zookeeper) Scheme() string {
|
|
return "zookeeper"
|
|
}
|
|
|
|
// Register is register instance.
|
|
func (z *Zookeeper) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) {
|
|
z.mutex.Lock()
|
|
if _, ok := z.registry[ins.AppID]; ok {
|
|
err = ErrDuplication
|
|
} else {
|
|
z.registry[ins.AppID] = struct{}{}
|
|
}
|
|
z.mutex.Unlock()
|
|
if err != nil {
|
|
return
|
|
}
|
|
ctx, cancel := context.WithCancel(z.ctx)
|
|
if err = z.register(ctx, ins); err != nil {
|
|
z.mutex.Lock()
|
|
delete(z.registry, ins.AppID)
|
|
z.mutex.Unlock()
|
|
cancel()
|
|
return
|
|
}
|
|
ch := make(chan struct{}, 1)
|
|
cancelFunc = context.CancelFunc(func() {
|
|
cancel()
|
|
<-ch
|
|
})
|
|
go func() {
|
|
for {
|
|
select {
|
|
case connEvent := <-z.connEvent:
|
|
log.Info("watch zkClient state, connEvent:(%+v)", connEvent)
|
|
if connEvent.State == zk.StateHasSession {
|
|
if err = z.register(ctx, ins); err != nil {
|
|
log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err))
|
|
continue
|
|
}
|
|
}
|
|
case <-ctx.Done():
|
|
ch <- struct{}{}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
|
|
func (z *Zookeeper) createPath(paths string) error {
|
|
var (
|
|
lastPath = "/"
|
|
seps = strings.Split(paths, "/")
|
|
)
|
|
for _, part := range seps {
|
|
if part == "" {
|
|
continue
|
|
}
|
|
lastPath = path.Join(lastPath, part)
|
|
ok, _, err := z.cli.Exists(lastPath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ok {
|
|
continue
|
|
}
|
|
ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll))
|
|
if err != nil {
|
|
log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err))
|
|
} else {
|
|
log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (z *Zookeeper) registerPeerServer(nodePath string, ins *naming.Instance) (err error) {
|
|
var (
|
|
str string
|
|
)
|
|
val, err := json.Marshal(ins)
|
|
if err != nil {
|
|
return
|
|
}
|
|
log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val)))
|
|
ok, _, err := z.cli.Exists(nodePath)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if ok {
|
|
return nil
|
|
}
|
|
str, err = z.cli.Create(nodePath, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
|
|
if err != nil {
|
|
log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", nodePath, err))
|
|
} else {
|
|
log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", nodePath, str))
|
|
}
|
|
return
|
|
}
|
|
|
|
// register is register instance to zookeeper.
|
|
func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err error) {
|
|
log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs)
|
|
|
|
prefix := z.keyPrefix(ins.AppID)
|
|
if err = z.createPath(prefix); err != nil {
|
|
log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err))
|
|
}
|
|
for _, addr := range ins.Addrs {
|
|
u, err := url.Parse(addr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
// grpc://127.0.0.1:8000 to 127.0.0.1
|
|
nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0]
|
|
if err = z.registerPeerServer(nodePath, ins); err != nil {
|
|
log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err))
|
|
} else {
|
|
log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node."))
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (z *Zookeeper) unregister(ins *naming.Instance) (err error) {
|
|
log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs)
|
|
prefix := z.keyPrefix(ins.AppID)
|
|
for _, addr := range ins.Addrs {
|
|
u, err := url.Parse(addr)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
// grpc://127.0.0.1:8000 to 127.0.0.1
|
|
nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0]
|
|
exists, _, err := z.cli.Exists(nodePath)
|
|
if err != nil {
|
|
log.Error("zk.Conn.Exists node:(%v), error:(%v)", nodePath, err)
|
|
continue
|
|
}
|
|
if exists {
|
|
_, s, err := z.cli.Get(nodePath)
|
|
if err != nil {
|
|
log.Error("zk.Conn.Get node:(%s), error:(%v)", nodePath, err)
|
|
continue
|
|
}
|
|
if err = z.cli.Delete(nodePath, s.Version); err != nil {
|
|
log.Error("zk.Conn.Delete node:(%s), error:(%v)", nodePath, err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", nodePath, ins.AppID, ins.Hostname))
|
|
}
|
|
return
|
|
}
|
|
|
|
func (z *Zookeeper) keyPrefix(appID string) string {
|
|
return path.Join(z.c.Root, appID)
|
|
}
|
|
|
|
// Close stop all running process including zk fetch and register.
|
|
func (z *Zookeeper) Close() error {
|
|
z.cancelFunc()
|
|
return nil
|
|
}
|
|
|
|
func (a *appInfo) watch(appID string) {
|
|
_ = a.fetchstore(appID)
|
|
go func() {
|
|
prefix := a.zkb.keyPrefix(appID)
|
|
for {
|
|
log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix))
|
|
snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix)
|
|
if err != nil {
|
|
log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err)
|
|
time.Sleep(time.Second)
|
|
_ = a.fetchstore(appID)
|
|
continue
|
|
}
|
|
log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot))
|
|
for ev := range event {
|
|
log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type))
|
|
if ev.Type == zk.EventNodeChildrenChanged {
|
|
_ = a.fetchstore(appID)
|
|
}
|
|
}
|
|
}
|
|
}()
|
|
}
|
|
|
|
func (a *appInfo) fetchstore(appID string) (err error) {
|
|
prefix := a.zkb.keyPrefix(appID)
|
|
childs, _, err := a.zkb.cli.Children(prefix)
|
|
if err != nil {
|
|
log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err))
|
|
return
|
|
}
|
|
log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs))
|
|
ins := &naming.InstancesInfo{
|
|
Instances: make(map[string][]*naming.Instance, 0),
|
|
}
|
|
for _, child := range childs {
|
|
nodePath := prefix + "/" + child
|
|
resp, _, err := a.zkb.cli.Get(nodePath)
|
|
if err != nil {
|
|
log.Error("zookeeper: fetch client.Get(%s) error:(%v)", nodePath, err)
|
|
return err
|
|
}
|
|
in := new(naming.Instance)
|
|
if err = json.Unmarshal(resp, in); err != nil {
|
|
return err
|
|
}
|
|
ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in)
|
|
|
|
}
|
|
a.store(ins)
|
|
return nil
|
|
}
|
|
|
|
func (a *appInfo) store(ins *naming.InstancesInfo) {
|
|
a.ins.Store(ins)
|
|
a.zkb.mutex.RLock()
|
|
for rs := range a.resolver {
|
|
select {
|
|
case rs.event <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
a.zkb.mutex.RUnlock()
|
|
}
|
|
|
|
// Watch watch instance.
|
|
func (r *Resolve) Watch() <-chan struct{} {
|
|
return r.event
|
|
}
|
|
|
|
// Fetch fetch resolver instance.
|
|
func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) {
|
|
r.zkb.mutex.RLock()
|
|
app, ok := r.zkb.apps[r.id]
|
|
r.zkb.mutex.RUnlock()
|
|
if ok {
|
|
ins, ok = app.ins.Load().(*naming.InstancesInfo)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// Close close resolver.
|
|
func (r *Resolve) Close() error {
|
|
r.zkb.mutex.Lock()
|
|
if app, ok := r.zkb.apps[r.id]; ok && len(app.resolver) != 0 {
|
|
delete(app.resolver, r)
|
|
}
|
|
r.zkb.mutex.Unlock()
|
|
return nil
|
|
}
|
|
|