feat(config): add ErrorWatcherStopped and return on watcher.Next() after Watcher.Stop() (#2092)

* feat(config): add ErrorWatcherStopped and return watcher.Next() after watcher.Stop()

* fix: remove unreachable code

* fix: return err when etcd WatcheResponse hold error
pull/2105/head
Cluas 2 years ago committed by GitHub
parent 752f011ba1
commit 4f8d8ef8da
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      config/config.go
  2. 4
      contrib/config/apollo/watcher.go
  3. 7
      contrib/config/consul/watcher.go
  4. 8
      contrib/config/etcd/watcher.go
  5. 6
      contrib/config/nacos/config.go

@ -20,6 +20,8 @@ var (
ErrNotFound = errors.New("key not found") ErrNotFound = errors.New("key not found")
// ErrTypeAssert is type assert error. // ErrTypeAssert is type assert error.
ErrTypeAssert = errors.New("type assert error") ErrTypeAssert = errors.New("type assert error")
// ErrWatcherStopped means watcher is stopped.
ErrWatcherStopped = errors.New("warcher stopped")
_ Config = (*config)(nil) _ Config = (*config)(nil)
) )

@ -1,8 +1,6 @@
package apollo package apollo
import ( import (
"context"
"github.com/go-kratos/kratos/v2/config" "github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
@ -62,7 +60,7 @@ func newWatcher(a *apollo) (config.Watcher, error) {
func (w *watcher) Next() ([]*config.KeyValue, error) { func (w *watcher) Next() ([]*config.KeyValue, error) {
kv, ok := <-w.out kv, ok := <-w.out
if !ok { if !ok {
return nil, context.Canceled return nil, config.ErrWatcherStopped
} }
return kv, nil return kv, nil
} }

@ -54,13 +54,10 @@ func newWatcher(s *source) (*watcher, error) {
func (w *watcher) Next() ([]*config.KeyValue, error) { func (w *watcher) Next() ([]*config.KeyValue, error) {
select { select {
case _, ok := <-w.ch: case <-w.ch:
if !ok {
return nil, nil
}
return w.source.Load() return w.source.Load()
case <-w.closeChan: case <-w.closeChan:
return nil, nil return nil, config.ErrWatcherStopped
} }
} }

@ -28,13 +28,13 @@ func newWatcher(s *source) *watcher {
func (s *watcher) Next() ([]*config.KeyValue, error) { func (s *watcher) Next() ([]*config.KeyValue, error) {
select { select {
case _, ok := <-s.ch: case resp := <-s.ch:
if !ok { if resp.Err() != nil {
return nil, nil return nil, resp.Err()
} }
return s.source.Load() return s.source.Load()
case <-s.closeChan: case <-s.closeChan:
return nil, nil return nil, config.ErrWatcherStopped
} }
} }

@ -106,7 +106,7 @@ func (c *Config) Watch() (config.Watcher, error) {
err := c.client.ListenConfig(vo.ConfigParam{ err := c.client.ListenConfig(vo.ConfigParam{
DataId: c.opts.dataID, DataId: c.opts.dataID,
Group: c.opts.group, Group: c.opts.group,
OnChange: func(namespace, group, dataId, data string) { OnChange: func(_, group, dataId, data string) {
if dataId == watcher.dataID && group == watcher.group { if dataId == watcher.dataID && group == watcher.group {
watcher.content <- data watcher.content <- data
} }
@ -144,8 +144,8 @@ func newWatcher(ctx context.Context, dataID string, group string, cancelListenCo
func (w *Watcher) Next() ([]*config.KeyValue, error) { func (w *Watcher) Next() ([]*config.KeyValue, error) {
select { select {
case <-w.Context.Done(): case <-w.Done():
return nil, nil return nil, config.ErrWatcherStopped
case content := <-w.content: case content := <-w.content:
k := w.dataID k := w.dataID
return []*config.KeyValue{ return []*config.KeyValue{

Loading…
Cancel
Save