diff --git a/config/config.go b/config/config.go index 9efc90240..0f62c5ba1 100644 --- a/config/config.go +++ b/config/config.go @@ -3,6 +3,7 @@ package config import ( "context" "errors" + "fmt" "reflect" "sync" "time" @@ -60,8 +61,28 @@ func New(opts ...Option) Config { } func (c *config) watch(w Watcher) { + var reloadReader = func(opts options) (Reader, error) { + r := newReader(c.opts) + for _, src := range c.opts.sources { + kvs, err := src.Load() + if err != nil { + return nil, fmt.Errorf("failed to load config source: %w", err) + } + for _, v := range kvs { + log.Debugf("config loaded: %s format: %s", v.Key, v.Format) + } + if err = r.Merge(kvs...); err != nil { + return nil, fmt.Errorf("failed to merge config source: %w", err) + } + } + if err := r.Resolve(); err != nil { + return nil, fmt.Errorf("failed to resolve config: %w", err) + } + return r, nil + } + for { - kvs, err := w.Next() + _, err := w.Next() if err != nil { if errors.Is(err, context.Canceled) { log.Infof("watcher's ctx cancel : %v", err) @@ -71,14 +92,12 @@ func (c *config) watch(w Watcher) { log.Errorf("failed to watch next config: %v", err) continue } - if err := c.reader.Merge(kvs...); err != nil { - log.Errorf("failed to merge next config: %v", err) - continue - } - if err := c.reader.Resolve(); err != nil { - log.Errorf("failed to resolve next config: %v", err) + r, err := reloadReader(c.opts) + if err != nil { + log.Errorf("failed to reload reader all sources: %v", err) continue } + c.reader = r c.cached.Range(func(key, value interface{}) bool { k := key.(string) v := value.(Value) diff --git a/config/config_test.go b/config/config_test.go index 9a166b00e..e7c4d9a45 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -182,3 +182,74 @@ func TestConfig(t *testing.T) { t.Error("len(testConf.Endpoints) is not equal to 2") } } + +func AssertStringValue(t *testing.T, expect any, v Value) bool { + t.Helper() + vs, err := v.String() + if err != nil { + t.Errorf("get string value error: %s", err.Error()) + return false + } + if vs != expect { + t.Errorf("value '%s' not equal '%s'", vs, expect) + return false + } + return true +} + +func TestConfig_WatchBetweenSourcesReferenceConfig(t *testing.T) { + var testJson = ` +{ + "foo": "${remote.foo}" +} +` + var testJson2 = ` +{ + "remote": { + "foo": "bar" + } +} +` + var testJsonUpdate = ` +{ + "remote": { + "foo": "bar2" + } +} +` + var src1 = newTestJSONSource(testJson) + var src2 = newTestJSONSource(testJson2) + opts := options{ + sources: []Source{src1, src2}, + decoder: defaultDecoder, + resolver: defaultResolver, + } + cf := &config{} + cf.opts = opts + cf.reader = newReader(opts) + + // load config + if err := cf.Load(); err != nil { + t.Fatal(err) + } + + if !AssertStringValue(t, "bar", cf.Value("foo")) { + t.FailNow() + } + if !AssertStringValue(t, "bar", cf.Value("remote.foo")) { + t.FailNow() + } + + // update remote.foo value + src2.data = testJsonUpdate + src2.sig <- struct{}{} + // wait for watch to finish + src2.sig <- struct{}{} + + if !AssertStringValue(t, "bar2", cf.Value("foo")) { + t.FailNow() + } + if !AssertStringValue(t, "bar2", cf.Value("remote.foo")) { + t.FailNow() + } +}