fix(contrib/registry/etcd): rewatch when error occur (#2603)

* fix(contrib/registry/etcd): rewatch when error occur

* fix(contrib/registry/etcd): close watcher before renew

* fix(contrib/registry/etcd): use w.ctx

Co-authored-by: zhangweili <zhangweili@bilibili.com>
pull/2604/head
baeNewJeans 2 years ago committed by GitHub
parent 480b16b817
commit 65c51594f9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 21
      contrib/registry/etcd/watcher.go

@ -2,6 +2,7 @@ package etcd
import ( import (
"context" "context"
"time"
clientv3 "go.etcd.io/etcd/client/v3" clientv3 "go.etcd.io/etcd/client/v3"
@ -14,6 +15,7 @@ type watcher struct {
key string key string
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
client *clientv3.Client
watchChan clientv3.WatchChan watchChan clientv3.WatchChan
watcher clientv3.Watcher watcher clientv3.Watcher
kv clientv3.KV kv clientv3.KV
@ -24,6 +26,7 @@ type watcher struct {
func newWatcher(ctx context.Context, key, name string, client *clientv3.Client) (*watcher, error) { func newWatcher(ctx context.Context, key, name string, client *clientv3.Client) (*watcher, error) {
w := &watcher{ w := &watcher{
key: key, key: key,
client: client,
watcher: clientv3.NewWatcher(client), watcher: clientv3.NewWatcher(client),
kv: clientv3.NewKV(client), kv: clientv3.NewKV(client),
first: true, first: true,
@ -31,7 +34,7 @@ func newWatcher(ctx context.Context, key, name string, client *clientv3.Client)
} }
w.ctx, w.cancel = context.WithCancel(ctx) w.ctx, w.cancel = context.WithCancel(ctx)
w.watchChan = w.watcher.Watch(w.ctx, key, clientv3.WithPrefix(), clientv3.WithRev(0), clientv3.WithKeysOnly()) w.watchChan = w.watcher.Watch(w.ctx, key, clientv3.WithPrefix(), clientv3.WithRev(0), clientv3.WithKeysOnly())
err := w.watcher.RequestProgress(context.Background()) err := w.watcher.RequestProgress(w.ctx)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -48,7 +51,14 @@ func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
select { select {
case <-w.ctx.Done(): case <-w.ctx.Done():
return nil, w.ctx.Err() return nil, w.ctx.Err()
case <-w.watchChan: case watchResp, ok := <-w.watchChan:
if !ok || watchResp.Err() != nil {
time.Sleep(time.Second)
err := w.reWatch()
if err != nil {
return nil, err
}
}
return w.getInstance() return w.getInstance()
} }
} }
@ -76,3 +86,10 @@ func (w *watcher) getInstance() ([]*registry.ServiceInstance, error) {
} }
return items, nil return items, nil
} }
func (w *watcher) reWatch() error {
w.watcher.Close()
w.watcher = clientv3.NewWatcher(w.client)
w.watchChan = w.watcher.Watch(w.ctx, w.key, clientv3.WithPrefix(), clientv3.WithRev(0), clientv3.WithKeysOnly())
return w.watcher.RequestProgress(w.ctx)
}

Loading…
Cancel
Save