fix(config): references between different sources are consistent when watching

Refs #2801
pull/2868/head
GuangZhan 1 year ago
parent 32b1d13f90
commit 93532ae621
No known key found for this signature in database
GPG Key ID: 7E6B84F82A98876B
  1. 33
      config/config.go
  2. 71
      config/config_test.go

@ -3,6 +3,7 @@ package config
import ( import (
"context" "context"
"errors" "errors"
"fmt"
"reflect" "reflect"
"sync" "sync"
"time" "time"
@ -60,8 +61,28 @@ func New(opts ...Option) Config {
} }
func (c *config) watch(w Watcher) { func (c *config) watch(w Watcher) {
var reloadReader = func(opts options) (Reader, error) {
r := newReader(c.opts)
for _, src := range c.opts.sources {
kvs, err := src.Load()
if err != nil {
return nil, fmt.Errorf("failed to load config source: %w", err)
}
for _, v := range kvs {
log.Debugf("config loaded: %s format: %s", v.Key, v.Format)
}
if err = r.Merge(kvs...); err != nil {
return nil, fmt.Errorf("failed to merge config source: %w", err)
}
}
if err := r.Resolve(); err != nil {
return nil, fmt.Errorf("failed to resolve config: %w", err)
}
return r, nil
}
for { for {
kvs, err := w.Next() _, err := w.Next()
if err != nil { if err != nil {
if errors.Is(err, context.Canceled) { if errors.Is(err, context.Canceled) {
log.Infof("watcher's ctx cancel : %v", err) log.Infof("watcher's ctx cancel : %v", err)
@ -71,14 +92,12 @@ func (c *config) watch(w Watcher) {
log.Errorf("failed to watch next config: %v", err) log.Errorf("failed to watch next config: %v", err)
continue continue
} }
if err := c.reader.Merge(kvs...); err != nil { r, err := reloadReader(c.opts)
log.Errorf("failed to merge next config: %v", err) if err != nil {
continue log.Errorf("failed to reload reader all sources: %v", err)
}
if err := c.reader.Resolve(); err != nil {
log.Errorf("failed to resolve next config: %v", err)
continue continue
} }
c.reader = r
c.cached.Range(func(key, value interface{}) bool { c.cached.Range(func(key, value interface{}) bool {
k := key.(string) k := key.(string)
v := value.(Value) v := value.(Value)

@ -182,3 +182,74 @@ func TestConfig(t *testing.T) {
t.Error("len(testConf.Endpoints) is not equal to 2") t.Error("len(testConf.Endpoints) is not equal to 2")
} }
} }
func AssertStringValue(t *testing.T, expect any, v Value) bool {
t.Helper()
vs, err := v.String()
if err != nil {
t.Errorf("get string value error: %s", err.Error())
return false
}
if vs != expect {
t.Errorf("value '%s' not equal '%s'", vs, expect)
return false
}
return true
}
func TestConfig_WatchBetweenSourcesReferenceConfig(t *testing.T) {
var testJson = `
{
"foo": "${remote.foo}"
}
`
var testJson2 = `
{
"remote": {
"foo": "bar"
}
}
`
var testJsonUpdate = `
{
"remote": {
"foo": "bar2"
}
}
`
var src1 = newTestJSONSource(testJson)
var src2 = newTestJSONSource(testJson2)
opts := options{
sources: []Source{src1, src2},
decoder: defaultDecoder,
resolver: defaultResolver,
}
cf := &config{}
cf.opts = opts
cf.reader = newReader(opts)
// load config
if err := cf.Load(); err != nil {
t.Fatal(err)
}
if !AssertStringValue(t, "bar", cf.Value("foo")) {
t.FailNow()
}
if !AssertStringValue(t, "bar", cf.Value("remote.foo")) {
t.FailNow()
}
// update remote.foo value
src2.data = testJsonUpdate
src2.sig <- struct{}{}
// wait for watch to finish
src2.sig <- struct{}{}
if !AssertStringValue(t, "bar2", cf.Value("foo")) {
t.FailNow()
}
if !AssertStringValue(t, "bar2", cf.Value("remote.foo")) {
t.FailNow()
}
}

Loading…
Cancel
Save