fix(contrib/registry/etcd):part of #1430,fix contrib/registry/etcd (#1441)

pull/1423/head
Casper-Mars 3 years ago committed by GitHub
parent aa5743147c
commit 73ce2941bd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      contrib/registry/etcd/registry.go
  2. 44
      contrib/registry/etcd/registry_test.go
  3. 20
      contrib/registry/etcd/watcher.go
  4. 1
      hack/.lintcheck_failures

@ -109,13 +109,13 @@ func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.Ser
if err != nil { if err != nil {
return nil, err return nil, err
} }
var items []*registry.ServiceInstance items := make([]*registry.ServiceInstance, len(resp.Kvs))
for _, kv := range resp.Kvs { for i, kv := range resp.Kvs {
si, err := unmarshal(kv.Value) si, err := unmarshal(kv.Value)
if err != nil { if err != nil {
return nil, err return nil, err
} }
items = append(items, si) items[i] = si
} }
return items, nil return items, nil
} }
@ -123,7 +123,7 @@ func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.Ser
// Watch creates a watcher according to the service name. // Watch creates a watcher according to the service name.
func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) { func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
key := fmt.Sprintf("%s/%s", r.opts.namespace, name) key := fmt.Sprintf("%s/%s", r.opts.namespace, name)
return newWatcher(ctx, key, r.client), nil return newWatcher(ctx, key, r.client)
} }
// registerWithKV create a new lease, return current leaseID // registerWithKV create a new lease, return current leaseID
@ -161,9 +161,9 @@ func (r *Registry) heartBeat(ctx context.Context, leaseID clientv3.LeaseID, key
cancelCtx, cancel := context.WithCancel(ctx) cancelCtx, cancel := context.WithCancel(ctx)
go func() { go func() {
defer cancel() defer cancel()
id, err := r.registerWithKV(cancelCtx, key, value) id, registerErr := r.registerWithKV(cancelCtx, key, value)
if err != nil { if registerErr != nil {
errChan <- err errChan <- registerErr
} else { } else {
idChan <- id idChan <- id
} }

@ -3,17 +3,20 @@ package etcd
import ( import (
"context" "context"
"fmt" "fmt"
"google.golang.org/grpc"
"testing" "testing"
"time" "time"
"google.golang.org/grpc"
"github.com/go-kratos/kratos/v2/registry" "github.com/go-kratos/kratos/v2/registry"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )
func TestRegistry(t *testing.T) { func TestRegistry(t *testing.T) {
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}, client, err := clientv3.New(clientv3.Config{
DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}}) Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -30,11 +33,13 @@ func TestRegistry(t *testing.T) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
defer w.Stop() defer func() {
_ = w.Stop()
}()
go func() { go func() {
for { for {
res, err := w.Next() res, err1 := w.Next()
if err != nil { if err1 != nil {
return return
} }
t.Logf("watch: %d", len(res)) t.Logf("watch: %d", len(res))
@ -45,8 +50,8 @@ func TestRegistry(t *testing.T) {
}() }()
time.Sleep(time.Second) time.Sleep(time.Second)
if err := r.Register(ctx, s); err != nil { if err1 := r.Register(ctx, s); err1 != nil {
t.Fatal(err) t.Fatal(err1)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
@ -58,8 +63,8 @@ func TestRegistry(t *testing.T) {
t.Errorf("not expected: %+v", res) t.Errorf("not expected: %+v", res)
} }
if err := r.Deregister(ctx, s); err != nil { if err1 := r.Deregister(ctx, s); err1 != nil {
t.Fatal(err) t.Fatal(err1)
} }
time.Sleep(time.Second) time.Sleep(time.Second)
@ -73,8 +78,10 @@ func TestRegistry(t *testing.T) {
} }
func TestHeartBeat(t *testing.T) { func TestHeartBeat(t *testing.T) {
client, err := clientv3.New(clientv3.Config{Endpoints: []string{"127.0.0.1:2379"}, client, err := clientv3.New(clientv3.Config{
DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()}}) Endpoints: []string{"127.0.0.1:2379"},
DialTimeout: time.Second, DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -88,14 +95,16 @@ func TestHeartBeat(t *testing.T) {
go func() { go func() {
r := New(client) r := New(client)
w, err := r.Watch(ctx, s.Name) w, err1 := r.Watch(ctx, s.Name)
if err != nil { if err1 != nil {
return return
} }
defer w.Stop() defer func() {
_ = w.Stop()
}()
for { for {
res, err := w.Next() res, err2 := w.Next()
if err != nil { if err2 != nil {
return return
} }
t.Logf("watch: %d", len(res)) t.Logf("watch: %d", len(res))
@ -141,5 +150,4 @@ func TestHeartBeat(t *testing.T) {
if len(res) == 0 { if len(res) == 0 {
t.Errorf("reconnect failed") t.Errorf("reconnect failed")
} }
} }

@ -7,9 +7,7 @@ import (
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
) )
var ( var _ registry.Watcher = &watcher{}
_ registry.Watcher = &watcher{}
)
type watcher struct { type watcher struct {
key string key string
@ -21,7 +19,7 @@ type watcher struct {
first bool first bool
} }
func newWatcher(ctx context.Context, key string, client *clientv3.Client) *watcher { func newWatcher(ctx context.Context, key string, client *clientv3.Client) (*watcher, error) {
w := &watcher{ w := &watcher{
key: key, key: key,
watcher: clientv3.NewWatcher(client), watcher: clientv3.NewWatcher(client),
@ -30,8 +28,11 @@ func newWatcher(ctx context.Context, key string, client *clientv3.Client) *watch
} }
w.ctx, w.cancel = context.WithCancel(ctx) w.ctx, w.cancel = context.WithCancel(ctx)
w.watchChan = w.watcher.Watch(w.ctx, key, clientv3.WithPrefix(), clientv3.WithRev(0)) w.watchChan = w.watcher.Watch(w.ctx, key, clientv3.WithPrefix(), clientv3.WithRev(0))
w.watcher.RequestProgress(context.Background()) err := w.watcher.RequestProgress(context.Background())
return w if err != nil {
return nil, err
}
return w, nil
} }
func (w *watcher) Next() ([]*registry.ServiceInstance, error) { func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
@ -47,7 +48,6 @@ func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
case <-w.watchChan: case <-w.watchChan:
return w.getInstance() return w.getInstance()
} }
} }
func (w *watcher) Stop() error { func (w *watcher) Stop() error {
@ -60,13 +60,13 @@ func (w *watcher) getInstance() ([]*registry.ServiceInstance, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
var items []*registry.ServiceInstance items := make([]*registry.ServiceInstance, len(resp.Kvs))
for _, kv := range resp.Kvs { for i, kv := range resp.Kvs {
si, err := unmarshal(kv.Value) si, err := unmarshal(kv.Value)
if err != nil { if err != nil {
return nil, err return nil, err
} }
items = append(items, si) items[i] = si
} }
return items, nil return items, nil
} }

@ -9,4 +9,3 @@
./contrib/registry/consul ./contrib/registry/consul
./contrib/registry/kubernetes ./contrib/registry/kubernetes
./contrib/registry/zookeeper ./contrib/registry/zookeeper
./contrib/registry/etcd

Loading…
Cancel
Save