fix watch event

pull/412/head
Tony 5 years ago
parent 264e0d6c06
commit 4b11f44419
  1. 18
      pkg/naming/zookeeper/zookeeper.go
  2. 10
      pkg/naming/zookeeper/zookeeper_test.go

@ -249,6 +249,7 @@ func (z *Zookeeper) register(ctx context.Context, ins *naming.Instance) (err err
if err != nil { if err != nil {
continue continue
} }
// grpc://127.0.0.1:8000 to 127.0.0.1
nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0]
if err = z.registerPeerServer(nodePath, ins); err != nil { if err = z.registerPeerServer(nodePath, ins); err != nil {
log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err)) log.Warn(fmt.Sprintf("registerServer, fail to RegisterPeerServer node:%s error:(%v)", addr, err))
@ -267,6 +268,7 @@ func (z *Zookeeper) unregister(ins *naming.Instance) (err error) {
if err != nil { if err != nil {
continue continue
} }
// grpc://127.0.0.1:8000 to 127.0.0.1
nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0] nodePath := prefix + "/" + strings.SplitN(u.Host, ":", 2)[0]
exists, _, err := z.cli.Exists(nodePath) exists, _, err := z.cli.Exists(nodePath)
if err != nil { if err != nil {
@ -310,6 +312,7 @@ func (a *appInfo) watch(appID string) {
if err != nil { if err != nil {
log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err) log.Error("zk ChildrenW fail to watch:%s error:(%v)", prefix, err)
time.Sleep(time.Second) time.Sleep(time.Second)
_ = a.fetchstore(appID)
continue continue
} }
log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot)) log.Info(fmt.Sprintf("zk ChildrenW ok, prefix:%s snapshot:(%v)", prefix, snapshot))
@ -325,27 +328,24 @@ func (a *appInfo) watch(appID string) {
func (a *appInfo) fetchstore(appID string) (err error) { func (a *appInfo) fetchstore(appID string) (err error) {
prefix := a.zkb.keyPrefix(appID) prefix := a.zkb.keyPrefix(appID)
a.zkb.createPath(prefix)
childs, _, err := a.zkb.cli.Children(prefix) childs, _, err := a.zkb.cli.Children(prefix)
if err != nil { if err != nil {
log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err)) log.Error(fmt.Sprintf("fetchstore, fail to get Children of node:(%v), error:(%v)", prefix, err))
} else { return
log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs))
} }
log.Info(fmt.Sprintf("fetchstore, ok to get Children of node:(%v), childs:(%v)", prefix, childs))
ins := &naming.InstancesInfo{ ins := &naming.InstancesInfo{
Instances: make(map[string][]*naming.Instance, 0), Instances: make(map[string][]*naming.Instance, 0),
} }
strNode := ""
for _, child := range childs { for _, child := range childs {
strNode = prefix + "/" + child nodePath := prefix + "/" + child
resp, _, err := a.zkb.cli.Get(strNode) resp, _, err := a.zkb.cli.Get(nodePath)
if err != nil { if err != nil {
log.Error("zookeeper: fetch client.Get(%s) error:(%v)", strNode, err) log.Error("zookeeper: fetch client.Get(%s) error:(%v)", nodePath, err)
return err return err
} }
in := new(naming.Instance) in := new(naming.Instance)
err = json.Unmarshal(resp, in) if err = json.Unmarshal(resp, in); err != nil {
if err != nil {
return err return err
} }
ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in) ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in)

@ -3,7 +3,6 @@ package zookeeper
import ( import (
"context" "context"
"testing" "testing"
"time"
"github.com/bilibili/kratos/pkg/naming" "github.com/bilibili/kratos/pkg/naming"
) )
@ -30,16 +29,13 @@ func TestZookeeper(t *testing.T) {
t.Fatal(err) t.Fatal(err)
} }
res := zk.Build(_testAppid) res := zk.Build(_testAppid)
go func() { event := res.Watch()
for event := range res.Watch() {
t.Log(event)
}
}()
_, err = zk.Register(context.TODO(), _testIns) _, err = zk.Register(context.TODO(), _testIns)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
time.Sleep(time.Second * 3) <-event
<-event
in, ok := res.Fetch(context.TODO()) in, ok := res.Fetch(context.TODO())
if !ok { if !ok {
t.Fatal("failed to resolver fetch") t.Fatal("failed to resolver fetch")

Loading…
Cancel
Save