You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kratos/config/config.go

145 lines
2.8 KiB

4 years ago
package config
import (
"encoding/json"
"errors"
"reflect"
"sync"
"time"
"github.com/go-kratos/kratos/v2/encoding"
4 years ago
"github.com/go-kratos/kratos/v2/log"
)
var (
// ErrNotFound is key not found.
ErrNotFound = errors.New("key not found")
// ErrTypeAssert is type assert error.
ErrTypeAssert = errors.New("type assert error")
_ Config = (*config)(nil)
codec = encoding.GetCodec("json")
4 years ago
)
// Observer is config observer.
type Observer func(string, Value)
// Config is a config interface.
type Config interface {
Load() error
Scan(v interface{}) error
Value(key string) Value
Watch(key string, o Observer) error
Close() error
}
type config struct {
opts options
reader Reader
cached sync.Map
observers sync.Map
watchers []Watcher
log *log.Helper
}
// New new a config with options.
func New(opts ...Option) Config {
options := options{
logger: log.DefaultLogger,
decoder: func(kv *KeyValue, v map[string]interface{}) error {
return json.Unmarshal(kv.Value, &v)
},
}
for _, o := range opts {
o(&options)
}
return &config{
opts: options,
reader: newReader(options),
log: log.NewHelper("config", options.logger),
}
}
func (c *config) watch(w Watcher) {
for {
kvs, err := w.Next()
if err != nil {
time.Sleep(time.Second)
c.log.Errorf("Failed to watch next config: %v", err)
continue
}
if err := c.reader.Merge(kvs...); err != nil {
c.log.Errorf("Failed to merge next config: %v", err)
continue
}
c.cached.Range(func(key, value interface{}) bool {
k := key.(string)
v := value.(Value)
if n, ok := c.reader.Value(k); ok && !reflect.DeepEqual(n.Load(), v.Load()) {
v.Store(n.Load())
if o, ok := c.observers.Load(k); ok {
o.(Observer)(k, v)
}
}
return true
})
}
}
func (c *config) Load() error {
for _, src := range c.opts.sources {
kvs, err := src.Load()
if err != nil {
return err
}
if err := c.reader.Merge(kvs...); err != nil {
c.log.Errorf("Failed to merge config source: %v", err)
return err
}
w, err := src.Watch()
if err != nil {
c.log.Errorf("Failed to watch config source: %v", err)
return err
}
go c.watch(w)
}
return nil
}
func (c *config) Value(key string) Value {
if v, ok := c.cached.Load(key); ok {
return v.(Value)
}
if v, ok := c.reader.Value(key); ok {
c.cached.Store(key, v)
return v
}
return &errValue{err: ErrNotFound}
}
func (c *config) Scan(v interface{}) error {
data, err := c.reader.Source()
if err != nil {
return err
}
return codec.Unmarshal(data, v)
4 years ago
}
func (c *config) Watch(key string, o Observer) error {
if v := c.Value(key); v.Load() == nil {
return ErrNotFound
}
c.observers.Store(key, o)
return nil
}
func (c *config) Close() error {
for _, w := range c.watchers {
if err := w.Close(); err != nil {
return err
}
}
return nil
}