commit
15bcd04635
@ -0,0 +1,396 @@ |
||||
package zookeeper |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"errors" |
||||
"fmt" |
||||
"net/url" |
||||
"path" |
||||
"strings" |
||||
"sync" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"github.com/bilibili/kratos/pkg/log" |
||||
"github.com/bilibili/kratos/pkg/naming" |
||||
xtime "github.com/bilibili/kratos/pkg/time" |
||||
"github.com/go-zookeeper/zk" |
||||
) |
||||
|
||||
// Config is zookeeper config.
|
||||
type Config struct { |
||||
Root string `json:"root"` |
||||
Endpoints []string `json:"endpoints"` |
||||
Timeout xtime.Duration `json:"timeout"` |
||||
} |
||||
|
||||
var ( |
||||
_once sync.Once |
||||
_builder naming.Builder |
||||
|
||||
// ErrDuplication is a register duplication err
|
||||
ErrDuplication = errors.New("zookeeper: instance duplicate registration") |
||||
) |
||||
|
||||
// Builder return default zookeeper resolver builder.
|
||||
func Builder(c *Config) naming.Builder { |
||||
_once.Do(func() { |
||||
_builder, _ = New(c) |
||||
}) |
||||
return _builder |
||||
} |
||||
|
||||
// Build register resolver into default zookeeper.
|
||||
func Build(c *Config, id string) naming.Resolver { |
||||
return Builder(c).Build(id) |
||||
} |
||||
|
||||
type appInfo struct { |
||||
resolver map[*Resolve]struct{} |
||||
ins atomic.Value |
||||
zkb *Zookeeper |
||||
once sync.Once |
||||
} |
||||
|
||||
// Resolve zookeeper resolver.
|
||||
type Resolve struct { |
||||
id string |
||||
event chan struct{} |
||||
zkb *Zookeeper |
||||
} |
||||
|
||||
// Zookeeper is a zookeeper client Builder.
|
||||
// path: /{root}/{appid}/{ip} -> json(instance)
|
||||
type Zookeeper struct { |
||||
c *Config |
||||
cli *zk.Conn |
||||
connEvent <-chan zk.Event |
||||
ctx context.Context |
||||
cancelFunc context.CancelFunc |
||||
|
||||
mutex sync.RWMutex |
||||
apps map[string]*appInfo |
||||
registry map[string]struct{} |
||||
} |
||||
|
||||
// New is new a zookeeper builder.
|
||||
func New(c *Config) (zkb *Zookeeper, err error) { |
||||
if c.Timeout == 0 { |
||||
c.Timeout = xtime.Duration(time.Second) |
||||
} |
||||
if len(c.Endpoints) == 0 { |
||||
errInfo := fmt.Sprintf("zookeeper New failed, endpoints is null") |
||||
log.Error(errInfo) |
||||
return nil, errors.New(errInfo) |
||||
} |
||||
|
||||
zkConn, connEvent, err := zk.Connect(c.Endpoints, time.Duration(c.Timeout)) |
||||
if err != nil { |
||||
log.Error(fmt.Sprintf("zk Connect err:(%v)", err)) |
||||
return |
||||
} |
||||
log.Info(fmt.Sprintf("zk Connect ok!")) |
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
zkb = &Zookeeper{ |
||||
c: c, |
||||
cli: zkConn, |
||||
connEvent: connEvent, |
||||
ctx: ctx, |
||||
cancelFunc: cancel, |
||||
apps: map[string]*appInfo{}, |
||||
registry: map[string]struct{}{}, |
||||
} |
||||
return |
||||
} |
||||
|
||||
// Build zookeeper resovler builder.
|
||||
func (z *Zookeeper) Build(appid string) naming.Resolver { |
||||
r := &Resolve{ |
||||
id: appid, |
||||
zkb: z, |
||||
event: make(chan struct{}, 1), |
||||
} |
||||
z.mutex.Lock() |
||||
app, ok := z.apps[appid] |
||||
if !ok { |
||||
app = &appInfo{ |
||||
resolver: make(map[*Resolve]struct{}), |
||||
zkb: z, |
||||
} |
||||
z.apps[appid] = app |
||||
} |
||||
app.resolver[r] = struct{}{} |
||||
z.mutex.Unlock() |
||||
if ok { |
||||
select { |
||||
case r.event <- struct{}{}: |
||||
default: |
||||
} |
||||
} |
||||
app.once.Do(func() { |
||||
go app.watch(appid) |
||||
}) |
||||
return r |
||||
} |
||||
|
||||
// Scheme return zookeeper's scheme.
|
||||
func (z *Zookeeper) Scheme() string { |
||||
return "zookeeper" |
||||
} |
||||
|
||||
// Register is register instance.
|
||||
func (z *Zookeeper) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { |
||||
z.mutex.Lock() |
||||
if _, ok := z.registry[ins.AppID]; ok { |
||||
err = ErrDuplication |
||||
} else { |
||||
z.registry[ins.AppID] = struct{}{} |
||||
} |
||||
z.mutex.Unlock() |
||||
if err != nil { |
||||
return |
||||
} |
||||
ctx, cancel := context.WithCancel(z.ctx) |
||||
if err = z.register(ctx, ins); err != nil { |
||||
z.mutex.Lock() |
||||
delete(z.registry, ins.AppID) |
||||
z.mutex.Unlock() |
||||
cancel() |
||||
return |
||||
} |
||||
ch := make(chan struct{}, 1) |
||||
cancelFunc = context.CancelFunc(func() { |
||||
cancel() |
||||
<-ch |
||||
}) |
||||
go func() { |
||||
for { |
||||
select { |
||||
case connEvent := <-z.connEvent: |
||||
log.Info("watch zkClient state, connEvent:(%+v)", connEvent) |
||||
if connEvent.State == zk.StateHasSession { |
||||
if err = z.register(ctx, ins); err != nil { |
||||
log.Warn(fmt.Sprintf("watch zkClient state, fail to register node error:(%v)", err)) |
||||
continue |
||||
} |
||||
} |
||||
case <-ctx.Done(): |
||||
ch <- struct{}{} |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
return |
||||
} |
||||
|
||||
func (z *Zookeeper) createPath(paths string) error { |
||||
var ( |
||||
lastPath = "/" |
||||
seps = strings.Split(paths, "/") |
||||
) |
||||
for _, part := range seps { |
||||
if part == "" { |
||||
continue |
||||
} |
||||
lastPath = path.Join(lastPath, part) |
||||
ok, _, err := z.cli.Exists(lastPath) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if ok { |
||||
continue |
||||
} |
||||
ret, err := z.cli.Create(lastPath, nil, 0, zk.WorldACL(zk.PermAll)) |
||||
if err != nil { |
||||
log.Warn(fmt.Sprintf("createPath, fail to Create node:(%s). error:(%v)", paths, err)) |
||||
} else { |
||||
log.Info(fmt.Sprintf("createPath, succeed to Create node:(%s). retStr:(%s)", paths, ret)) |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (z *Zookeeper) registerPeerServer(nodePath string, ins *naming.Instance) (err error) { |
||||
var ( |
||||
str string |
||||
) |
||||
val, err := json.Marshal(ins) |
||||
if err != nil { |
||||
return |
||||
} |
||||
log.Info(fmt.Sprintf("registerPeerServer, ins after json.Marshal:(%v)", string(val))) |
||||
ok, _, err := z.cli.Exists(nodePath) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if ok { |
||||
return nil |
||||
} |
||||
str, err = z.cli.Create(nodePath, val, zk.FlagEphemeral, zk.WorldACL(zk.PermAll)) |
||||
if err != nil { |
||||
log.Warn(fmt.Sprintf("registerPeerServer, fail to Create node:%s. error:(%v)", nodePath, err)) |
||||
} else { |
||||
log.Info(fmt.Sprintf("registerPeerServer, succeed to Create node:%s. retStr:(%s)", nodePath, str)) |
||||
} |
||||
return |
||||
} |
||||
|
||||
// register is register instance to zookeeper.
|
||||
func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err error) { |
||||
log.Info("zookeeper register enter, instance Addrs:(%v)", ins.Addrs) |
||||
|
||||
prefix := z.keyPrefix(ins.AppID) |
||||
if err = z.createPath(prefix); err != nil { |
||||
log.Warn(fmt.Sprintf("register, fail to createPath node error:(%v)", err)) |
||||
} |
||||
for _, addr := range ins.Addrs { |
||||
u, err := url.Parse(addr) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
// grpc://127.0.0.1:8000 to 127.0.0.1
|
||||
nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] |
||||
if err = z.registerPeerServer(nodePath, ins); err != nil { |
||||
log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) |
||||
} else { |
||||
log.Info(fmt.Sprintf("registerServer, succeed to RegistServer node.")) |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (z *Zookeeper) unregister(ins *naming.Instance) (err error) { |
||||
log.Info("zookeeper unregister enter, instance Addrs:(%v)", ins.Addrs) |
||||
prefix := z.keyPrefix(ins.AppID) |
||||
for _, addr := range ins.Addrs { |
||||
u, err := url.Parse(addr) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
// grpc://127.0.0.1:8000 to 127.0.0.1
|
||||
nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] |
||||
exists, _, err := z.cli.Exists(nodePath) |
||||
if err != nil { |
||||
log.Error("zk.Conn.Exists node:(%v), error:(%v)", nodePath, err) |
||||
continue |
||||
} |
||||
if exists { |
||||
_, s, err := z.cli.Get(nodePath) |
||||
if err != nil { |
||||
log.Error("zk.Conn.Get node:(%s), error:(%v)", nodePath, err) |
||||
continue |
||||
} |
||||
if err = z.cli.Delete(nodePath, s.Version); err != nil { |
||||
log.Error("zk.Conn.Delete node:(%s), error:(%v)", nodePath, err) |
||||
continue |
||||
} |
||||
} |
||||
|
||||
log.Info(fmt.Sprintf("unregister, client.Delete:(%v), appid:(%v), hostname:(%v) success", nodePath, ins.AppID, ins.Hostname)) |
||||
} |
||||
return |
||||
} |
||||
|
||||
func (z *Zookeeper) keyPrefix(appID string) string { |
||||
return path.Join(z.c.Root, appID) |
||||
} |
||||
|
||||
// Close stop all running process including zk fetch and register.
|
||||
func (z *Zookeeper) Close() error { |
||||
z.cancelFunc() |
||||
return nil |
||||
} |
||||
|
||||
func (a *appInfo) watch(appID string) { |
||||
_ = a.fetchstore(appID) |
||||
go func() { |
||||
prefix := a.zkb.keyPrefix(appID) |
||||
for { |
||||
log.Info(fmt.Sprintf("zk ChildrenW enter, prefix:(%v)", prefix)) |
||||
snapshot, _, event, err := a.zkb.cli.ChildrenW(prefix) |
||||
if err != nil { |
||||
log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err) |
||||
time.Sleep(time.Second) |
||||
_ = a.fetchstore(appID) |
||||
continue |
||||
} |
||||
log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot)) |
||||
for ev := range event { |
||||
log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:(%v), event Path:(%v), Type:(%v)", prefix, ev.Path, ev.Type)) |
||||
if ev.Type == zk.EventNodeChildrenChanged { |
||||
_ = a.fetchstore(appID) |
||||
} |
||||
} |
||||
} |
||||
}() |
||||
} |
||||
|
||||
func (a *appInfo) fetchstore(appID string) (err error) { |
||||
prefix := a.zkb.keyPrefix(appID) |
||||
childs, _, err := a.zkb.cli.Children(prefix) |
||||
if err != nil { |
||||
log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) |
||||
return |
||||
} |
||||
log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs)) |
||||
ins := &naming.InstancesInfo{ |
||||
Instances: make(map[string][]*naming.Instance, 0), |
||||
} |
||||
for _, child := range childs { |
||||
nodePath := prefix + "/" + child |
||||
resp, _, err := a.zkb.cli.Get(nodePath) |
||||
if err != nil { |
||||
log.Error("zookeeper: fetch client.Get(%s) error:(%v)", nodePath, err) |
||||
return err |
||||
} |
||||
in := new(naming.Instance) |
||||
if err = json.Unmarshal(resp, in); err != nil { |
||||
return err |
||||
} |
||||
ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in) |
||||
|
||||
} |
||||
a.store(ins) |
||||
return nil |
||||
} |
||||
|
||||
func (a *appInfo) store(ins *naming.InstancesInfo) { |
||||
a.ins.Store(ins) |
||||
a.zkb.mutex.RLock() |
||||
for rs := range a.resolver { |
||||
select { |
||||
case rs.event <- struct{}{}: |
||||
default: |
||||
} |
||||
} |
||||
a.zkb.mutex.RUnlock() |
||||
} |
||||
|
||||
// Watch watch instance.
|
||||
func (r *Resolve) Watch() <-chan struct{} { |
||||
return r.event |
||||
} |
||||
|
||||
// Fetch fetch resolver instance.
|
||||
func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) { |
||||
r.zkb.mutex.RLock() |
||||
app, ok := r.zkb.apps[r.id] |
||||
r.zkb.mutex.RUnlock() |
||||
if ok { |
||||
ins, ok = app.ins.Load().(*naming.InstancesInfo) |
||||
return |
||||
} |
||||
return |
||||
} |
||||
|
||||
// Close close resolver.
|
||||
func (r *Resolve) Close() error { |
||||
r.zkb.mutex.Lock() |
||||
if app, ok := r.zkb.apps[r.id]; ok && len(app.resolver) != 0 { |
||||
delete(app.resolver, r) |
||||
} |
||||
r.zkb.mutex.Unlock() |
||||
return nil |
||||
} |
@ -0,0 +1,46 @@ |
||||
package zookeeper |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
|
||||
"github.com/bilibili/kratos/pkg/naming" |
||||
) |
||||
|
||||
var ( |
||||
_testAppid = "test_appid" |
||||
|
||||
_testConf = &Config{ |
||||
Root: "/test", |
||||
Endpoints: []string{"127.0.0.1:2181"}, |
||||
} |
||||
_testIns = &naming.Instance{ |
||||
AppID: _testAppid, |
||||
Addrs: []string{"grpc://127.0.0.1:9000"}, |
||||
Metadata: map[string]string{ |
||||
"test_key": "test_value", |
||||
}, |
||||
} |
||||
) |
||||
|
||||
func TestZookeeper(t *testing.T) { |
||||
zk, err := New(_testConf) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
_, err = zk.Register(context.TODO(), _testIns) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
// fetch&watch
|
||||
res := zk.Build(_testAppid) |
||||
event := res.Watch() |
||||
<-event |
||||
in, ok := res.Fetch(context.TODO()) |
||||
if !ok { |
||||
t.Fatal("failed to resolver fetch") |
||||
} |
||||
if len(in.Instances) != 1 { |
||||
t.Fatalf("Instances not match, got:%d want:1", len(in.Instances)) |
||||
} |
||||
} |
Loading…
Reference in new issue