chore(contrib/config): uniformly canceled by CancelFunc (#2111)

* chore: remove sentinel error for compatibility

* chore: core use lssentinel error

* chore: uniformly canceled by CancelFunc

* chore: remove error
pull/2115/head
Cluas 2 years ago committed by GitHub
parent 3aaac45e3d
commit 6fa5700c3c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      config/config.go
  2. 22
      contrib/config/apollo/watcher.go
  3. 27
      contrib/config/consul/watcher.go
  4. 30
      contrib/config/etcd/watcher.go
  5. 53
      contrib/config/nacos/config.go
  6. 65
      contrib/config/nacos/watcher.go

@ -20,8 +20,6 @@ var (
ErrNotFound = errors.New("key not found")
// ErrTypeAssert is type assert error.
ErrTypeAssert = errors.New("type assert error")
// ErrWatcherStopped means watcher is stopped.
ErrWatcherStopped = errors.New("warcher stopped")
_ Config = (*config)(nil)
)

@ -1,6 +1,8 @@
package apollo
import (
"context"
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/log"
@ -8,7 +10,9 @@ import (
)
type watcher struct {
out <-chan []*config.KeyValue
out <-chan []*config.KeyValue
ctx context.Context
cancelFn func()
}
@ -47,28 +51,32 @@ func newWatcher(a *apollo) (config.Watcher, error) {
changeCh := make(chan []*config.KeyValue)
listener := &customChangeListener{in: changeCh, apollo: a}
a.client.AddChangeListener(listener)
ctx, cancel := context.WithCancel(context.Background())
return &watcher{
out: changeCh,
ctx: ctx,
cancelFn: func() {
a.client.RemoveChangeListener(listener)
close(changeCh)
cancel()
},
}, nil
}
// Next will be blocked until the Stop method is called
func (w *watcher) Next() ([]*config.KeyValue, error) {
kv, ok := <-w.out
if !ok {
return nil, config.ErrWatcherStopped
select {
case kv := <-w.out:
return kv, nil
case <-w.ctx.Done():
return nil, w.ctx.Err()
}
return kv, nil
}
func (w *watcher) Stop() error {
if w.cancelFn != nil {
w.cancelFn()
}
return nil
}

@ -1,16 +1,20 @@
package consul
import (
"context"
"github.com/go-kratos/kratos/v2/config"
"github.com/hashicorp/consul/api"
"github.com/hashicorp/consul/api/watch"
)
type watcher struct {
source *source
ch chan interface{}
closeChan chan struct{}
wp *watch.Plan
source *source
ch chan interface{}
wp *watch.Plan
ctx context.Context
cancel context.CancelFunc
}
func (w *watcher) handle(idx uint64, data interface{}) {
@ -27,10 +31,13 @@ func (w *watcher) handle(idx uint64, data interface{}) {
}
func newWatcher(s *source) (*watcher, error) {
ctx, cancel := context.WithCancel(context.Background())
w := &watcher{
source: s,
ch: make(chan interface{}),
closeChan: make(chan struct{}),
source: s,
ch: make(chan interface{}),
ctx: ctx,
cancel: cancel,
}
wp, err := watch.Parse(map[string]interface{}{"type": "keyprefix", "prefix": s.options.path})
@ -56,13 +63,13 @@ func (w *watcher) Next() ([]*config.KeyValue, error) {
select {
case <-w.ch:
return w.source.Load()
case <-w.closeChan:
return nil, config.ErrWatcherStopped
case <-w.ctx.Done():
return nil, w.ctx.Err()
}
}
func (w *watcher) Stop() error {
w.wp.Stop()
close(w.closeChan)
w.cancel()
return nil
}

@ -1,20 +1,26 @@
package etcd
import (
"context"
"github.com/go-kratos/kratos/v2/config"
clientv3 "go.etcd.io/etcd/client/v3"
)
type watcher struct {
source *source
ch clientv3.WatchChan
closeChan chan struct{}
source *source
ch clientv3.WatchChan
ctx context.Context
cancel context.CancelFunc
}
func newWatcher(s *source) *watcher {
ctx, cancel := context.WithCancel(context.Background())
w := &watcher{
source: s,
closeChan: make(chan struct{}),
source: s,
ctx: ctx,
cancel: cancel,
}
var opts []clientv3.OpOption
@ -26,19 +32,19 @@ func newWatcher(s *source) *watcher {
return w
}
func (s *watcher) Next() ([]*config.KeyValue, error) {
func (w *watcher) Next() ([]*config.KeyValue, error) {
select {
case resp := <-s.ch:
case resp := <-w.ch:
if resp.Err() != nil {
return nil, resp.Err()
}
return s.source.Load()
case <-s.closeChan:
return nil, config.ErrWatcherStopped
return w.source.Load()
case <-w.ctx.Done():
return nil, w.ctx.Err()
}
}
func (s *watcher) Stop() error {
close(s.closeChan)
func (w *watcher) Stop() error {
w.cancel()
return nil
}

@ -117,56 +117,3 @@ func (c *Config) Watch() (config.Watcher, error) {
}
return watcher, nil
}
type Watcher struct {
context.Context
dataID string
group string
content chan string
cancelListenConfig cancelListenConfigFunc
cancel context.CancelFunc
}
type cancelListenConfigFunc func(params vo.ConfigParam) (err error)
func newWatcher(ctx context.Context, dataID string, group string, cancelListenConfig cancelListenConfigFunc) *Watcher {
w := &Watcher{
dataID: dataID,
group: group,
cancelListenConfig: cancelListenConfig,
content: make(chan string, 100),
}
ctx, cancel := context.WithCancel(ctx)
w.Context = ctx
w.cancel = cancel
return w
}
func (w *Watcher) Next() ([]*config.KeyValue, error) {
select {
case <-w.Done():
return nil, config.ErrWatcherStopped
case content := <-w.content:
k := w.dataID
return []*config.KeyValue{
{
Key: k,
Value: []byte(content),
Format: strings.TrimPrefix(filepath.Ext(k), "."),
},
}, nil
}
}
func (w *Watcher) Close() error {
err := w.cancelListenConfig(vo.ConfigParam{
DataId: w.dataID,
Group: w.group,
})
w.cancel()
return err
}
func (w *Watcher) Stop() error {
return w.Close()
}

@ -0,0 +1,65 @@
package config
import (
"context"
"path/filepath"
"strings"
"github.com/go-kratos/kratos/v2/config"
"github.com/nacos-group/nacos-sdk-go/vo"
)
type Watcher struct {
dataID string
group string
content chan string
cancelListenConfig cancelListenConfigFunc
ctx context.Context
cancel context.CancelFunc
}
type cancelListenConfigFunc func(params vo.ConfigParam) (err error)
func newWatcher(ctx context.Context, dataID string, group string, cancelListenConfig cancelListenConfigFunc) *Watcher {
ctx, cancel := context.WithCancel(ctx)
w := &Watcher{
dataID: dataID,
group: group,
cancelListenConfig: cancelListenConfig,
content: make(chan string, 100),
ctx: ctx,
cancel: cancel,
}
return w
}
func (w *Watcher) Next() ([]*config.KeyValue, error) {
select {
case <-w.ctx.Done():
return nil, w.ctx.Err()
case content := <-w.content:
k := w.dataID
return []*config.KeyValue{
{
Key: k,
Value: []byte(content),
Format: strings.TrimPrefix(filepath.Ext(k), "."),
},
}, nil
}
}
func (w *Watcher) Close() error {
err := w.cancelListenConfig(vo.ConfigParam{
DataId: w.dataID,
Group: w.group,
})
w.cancel()
return err
}
func (w *Watcher) Stop() error {
return w.Close()
}
Loading…
Cancel
Save