From eec45a3d0ac0ce2e9ac3161c920699eb87cc438d Mon Sep 17 00:00:00 2001 From: Yeqllo Date: Thu, 21 Oct 2021 12:48:53 +0800 Subject: [PATCH] fix(config/apollo): apollo namespace (#1516) * fix(config/apollo): support multiple namespace * fix(config/apollo): modify example and test * fix(config/apollo): recoding watcher * styl(config/apollo): package sort; use log instead of fmt * styl(config/apollo): use kratos/log package instead of fmt * styl(config/apollo): optimise code with reviewer advises; fix some edge cases on genKey function. * styl(config/apollo): rename `convertProperties` as `resolve` --- contrib/config/apollo/apollo.go | 121 +++++++++++++++++-- contrib/config/apollo/apollo_test.go | 172 +++++++++++++++++++++++++++ contrib/config/apollo/go.mod | 1 + contrib/config/apollo/json_parser.go | 25 ++++ contrib/config/apollo/watcher.go | 85 +++++++++---- examples/config/apollo/README.md | 4 + examples/config/apollo/main.go | 83 ++++++++----- 7 files changed, 426 insertions(+), 65 deletions(-) create mode 100644 contrib/config/apollo/apollo_test.go create mode 100644 contrib/config/apollo/json_parser.go diff --git a/contrib/config/apollo/apollo.go b/contrib/config/apollo/apollo.go index 8df7465b2..866c0ea3f 100644 --- a/contrib/config/apollo/apollo.go +++ b/contrib/config/apollo/apollo.go @@ -1,7 +1,12 @@ package apollo import ( + "fmt" + "strings" + "github.com/go-kratos/kratos/v2/config" + "github.com/go-kratos/kratos/v2/encoding" + "github.com/go-kratos/kratos/v2/log" "github.com/apolloconfig/agollo/v4" apolloConfig "github.com/apolloconfig/agollo/v4/env/config" @@ -9,6 +14,7 @@ import ( type apollo struct { client *agollo.Client + opt *options } // Option is apollo option @@ -22,6 +28,8 @@ type options struct { namespace string isBackupConfig bool backupPath string + + logger log.Logger } // WithAppID with apollo config app id @@ -80,8 +88,19 @@ func WithBackupPath(backupPath string) Option { } } +// WithLogger use custom logger to replace default logger. +func WithLogger(logger log.Logger) Option { + return func(o *options) { + if logger != nil { + o.logger = logger + } + } +} + func NewSource(opts ...Option) config.Source { - op := options{} + op := options{ + logger: log.DefaultLogger, + } for _, o := range opts { o(&op) } @@ -99,18 +118,104 @@ func NewSource(opts ...Option) config.Source { if err != nil { panic(err) } - return &apollo{client} + + return &apollo{client: client, opt: &op} +} + +// genKey got the key of config.KeyValue pair. +// eg: namespace.ext with subKey got namespace.subKey +func genKey(ns, sub string) string { + arr := strings.Split(ns, ".") + if len(arr) < 1 { + return sub + } + + if len(arr) == 1 { + if ns == "" { + return sub + } + return ns + "." + sub + } + + return strings.Join(arr[:len(arr)-1], ".") + "." + sub +} + +// resolve convert kv pair into one map[string]interface{} by split key into different +// map level. such as: app.name = "application" => map[app][name] = "application" +func resolve(key string, value interface{}, target map[string]interface{}) { + // expand key "aaa.bbb" into map[aaa]map[bbb]interface{} + keys := strings.Split(key, ".") + last := len(keys) - 1 + cursor := target + + for i, k := range keys { + if i == last { + cursor[k] = value + break + } + + // not the last key, be deeper + v, ok := cursor[k] + if !ok { + // create a new map + deeper := make(map[string]interface{}) + cursor[k] = deeper + cursor = deeper + continue + } + + // current exists, then check existing value type, if it's not map + // that means duplicate keys, and at least one is not map instance. + if cursor, ok = v.(map[string]interface{}); !ok { + _ = log.DefaultLogger.Log(log.LevelWarn, + "msg", + fmt.Sprintf("duplicate key: %v\n", strings.Join(keys[:i+1], ".")), + ) + break + } + } +} + +func format(ns string) string { + arr := strings.Split(ns, ".") + if len(arr) <= 1 { + return "json" + } + + return arr[len(arr)-1] } func (e *apollo) load() []*config.KeyValue { kv := make([]*config.KeyValue, 0) - e.client.GetDefaultConfigCache().Range(func(key, value interface{}) bool { + namespaces := strings.Split(e.opt.namespace, ",") + + for _, ns := range namespaces { + next := map[string]interface{}{} + e.client.GetConfigCache(ns).Range(func(key, value interface{}) bool { + // all values are out properties format + resolve(genKey(ns, key.(string)), value, next) + return true + }) + + // serialize the namespace content KeyValue into bytes. + f := format(ns) + codec := encoding.GetCodec(f) + val, err := codec.Marshal(next) + if err != nil { + _ = e.opt.logger.Log(log.LevelWarn, + "msg", + fmt.Sprintf("apollo could not handle namespace %s: %v", ns, err), + ) + continue + } + kv = append(kv, &config.KeyValue{ - Key: key.(string), - Value: []byte(value.(string)), + Key: ns, + Value: val, + Format: f, }) - return true - }) + } + return kv } @@ -119,7 +224,7 @@ func (e *apollo) Load() (kv []*config.KeyValue, err error) { } func (e *apollo) Watch() (config.Watcher, error) { - w, err := NewWatcher(e) + w, err := newWatcher(e, e.opt.logger) if err != nil { return nil, err } diff --git a/contrib/config/apollo/apollo_test.go b/contrib/config/apollo/apollo_test.go new file mode 100644 index 000000000..b881fbd40 --- /dev/null +++ b/contrib/config/apollo/apollo_test.go @@ -0,0 +1,172 @@ +package apollo + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func Test_genKey(t *testing.T) { + type args struct { + ns string + sub string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "case 1", + args: args{ + ns: "", + sub: "has_no_ns", + }, + want: "has_no_ns", + }, + { + name: "case 2", + args: args{ + ns: "ns.ext", + sub: "sub", + }, + want: "ns.sub", + }, + { + name: "case 3", + args: args{ + ns: "", + sub: "", + }, + want: "", + }, + { + name: "case 4", + args: args{ + ns: "ns.ext", + sub: "sub.sub2.sub3", + }, + want: "ns.sub.sub2.sub3", + }, + { + name: "case 5", + args: args{ + ns: "ns.more.ext", + sub: "sub.sub2.sub3", + }, + want: "ns.more.sub.sub2.sub3", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := genKey(tt.args.ns, tt.args.sub); got != tt.want { + t.Errorf("genKey() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_format(t *testing.T) { + type args struct { + ns string + } + tests := []struct { + name string + args args + want string + }{ + { + name: "case 0", + args: args{ + ns: "ns.yaml", + }, + want: "yaml", + }, + { + name: "case 1", + args: args{ + ns: "ns", + }, + want: "json", + }, + { + name: "case 2", + args: args{ + ns: "ns.more.json", + }, + want: "json", + }, + { + name: "case 3", + args: args{ + ns: "", + }, + want: "json", + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := format(tt.args.ns); got != tt.want { + t.Errorf("format() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_convertProperties(t *testing.T) { + type args struct { + key string + value interface{} + target map[string]interface{} + } + tests := []struct { + name string + args args + want map[string]interface{} + }{ + { + name: "case 0", + args: args{ + key: "application.name", + value: "app name", + target: map[string]interface{}{}, + }, + want: map[string]interface{}{ + "application": map[string]interface{}{ + "name": "app name", + }, + }, + }, + { + name: "case 1", + args: args{ + key: "application", + value: []string{"1", "2", "3"}, + target: map[string]interface{}{}, + }, + want: map[string]interface{}{ + "application": []string{"1", "2", "3"}, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + resolve(tt.args.key, tt.args.value, tt.args.target) + assert.Equal(t, tt.want, tt.args.target) + }) + } +} + +func Test_convertProperties_duplicate(t *testing.T) { + target := map[string]interface{}{} + resolve("application.name", "name", target) + assert.Contains(t, target, "application") + assert.Contains(t, target["application"], "name") + assert.Equal(t, "name", target["application"].(map[string]interface{})["name"]) + + // cause duplicate, the oldest value will be kept + resolve("application.name.first", "first name", target) + assert.Contains(t, target, "application") + assert.Contains(t, target["application"], "name") + assert.Equal(t, "name", target["application"].(map[string]interface{})["name"]) +} diff --git a/contrib/config/apollo/go.mod b/contrib/config/apollo/go.mod index cd31ae639..b7b04ea16 100644 --- a/contrib/config/apollo/go.mod +++ b/contrib/config/apollo/go.mod @@ -4,6 +4,7 @@ go 1.16 require ( github.com/apolloconfig/agollo/v4 v4.0.8 + github.com/stretchr/testify v1.7.0 github.com/go-kratos/kratos/v2 v2.1.0 ) diff --git a/contrib/config/apollo/json_parser.go b/contrib/config/apollo/json_parser.go new file mode 100644 index 000000000..827c7b29b --- /dev/null +++ b/contrib/config/apollo/json_parser.go @@ -0,0 +1,25 @@ +package apollo + +import ( + "encoding/json" + + "github.com/apolloconfig/agollo/v4/constant" + "github.com/apolloconfig/agollo/v4/extension" +) + +type jsonExtParser struct{} + +func (j jsonExtParser) Parse(configContent interface{}) (map[string]interface{}, error) { + v, ok := configContent.(string) + if !ok { + return nil, nil + } + out := make(map[string]interface{}, 4) + err := json.Unmarshal([]byte(v), &out) + return out, err +} + +func init() { + // add json format + extension.AddFormatParser(constant.JSON, &jsonExtParser{}) +} diff --git a/contrib/config/apollo/watcher.go b/contrib/config/apollo/watcher.go index a9adf1e94..b1eea6a31 100644 --- a/contrib/config/apollo/watcher.go +++ b/contrib/config/apollo/watcher.go @@ -1,53 +1,90 @@ package apollo import ( + "fmt" + "github.com/go-kratos/kratos/v2/config" + "github.com/go-kratos/kratos/v2/encoding" + "github.com/go-kratos/kratos/v2/log" "github.com/apolloconfig/agollo/v4/storage" ) type watcher struct { - event chan []*config.KeyValue + out <-chan []*config.KeyValue + cancelFn func() } type customChangeListener struct { - event chan []*config.KeyValue + in chan<- []*config.KeyValue + logger log.Logger } -func (c *customChangeListener) OnChange(changeEvent *storage.ChangeEvent) { - kv := make([]*config.KeyValue, 0) - for key, value := range changeEvent.Changes { - kv = append(kv, &config.KeyValue{ - Key: key, - Value: []byte(value.NewValue.(string)), - }) +func (c *customChangeListener) onChange( + namespace string, changes map[string]*storage.ConfigChange) []*config.KeyValue { + + kv := make([]*config.KeyValue, 0, 2) + next := make(map[string]interface{}) + + for key, change := range changes { + resolve(genKey(namespace, key), change.NewValue, next) } - c.event <- kv + + f := format(namespace) + codec := encoding.GetCodec(f) + val, err := codec.Marshal(next) + if err != nil { + _ = c.logger.Log(log.LevelWarn, + "msg", + fmt.Sprintf("apollo could not handle namespace %s: %v", namespace, err), + ) + return nil + } + kv = append(kv, &config.KeyValue{ + Key: namespace, + Value: val, + Format: f, + }) + + return kv } -func (c *customChangeListener) OnNewestChange(changeEvent *storage.FullChangeEvent) { - kv := make([]*config.KeyValue, 0) - for key, value := range changeEvent.Changes { - kv = append(kv, &config.KeyValue{ - Key: key, - Value: []byte(value.(string)), - }) +func (c *customChangeListener) OnChange(changeEvent *storage.ChangeEvent) { + change := c.onChange(changeEvent.Namespace, changeEvent.Changes) + if len(change) == 0 { + return } - c.event <- kv + + c.in <- change } -func NewWatcher(a *apollo) (config.Watcher, error) { - e := make(chan []*config.KeyValue) - a.client.AddChangeListener(&customChangeListener{event: e}) - return &watcher{event: e}, nil +func (c *customChangeListener) OnNewestChange(changeEvent *storage.FullChangeEvent) {} + +func newWatcher(a *apollo, logger log.Logger) (config.Watcher, error) { + if logger == nil { + logger = log.DefaultLogger + } + + changeCh := make(chan []*config.KeyValue) + a.client.AddChangeListener(&customChangeListener{in: changeCh, logger: logger}) + + return &watcher{ + out: changeCh, + cancelFn: func() { + close(changeCh) + }, + }, nil } // Next will be blocked until the Stop method is called func (w *watcher) Next() ([]*config.KeyValue, error) { - return <-w.event, nil + return <-w.out, nil } func (w *watcher) Stop() error { - close(w.event) + if w.cancelFn != nil { + w.cancelFn() + } + return nil } diff --git a/examples/config/apollo/README.md b/examples/config/apollo/README.md index c388e78a0..e23004bb6 100644 --- a/examples/config/apollo/README.md +++ b/examples/config/apollo/README.md @@ -4,3 +4,7 @@ You can deploy Apollo yourself or use docker compose in example to start Apollo, then modify the configuration in the code to your actual Apollo configuration, and run the program +### Sample account + +Account: `apollo` +Password: `admin` \ No newline at end of file diff --git a/examples/config/apollo/main.go b/examples/config/apollo/main.go index 1e756aef8..90efd83a4 100644 --- a/examples/config/apollo/main.go +++ b/examples/config/apollo/main.go @@ -1,12 +1,35 @@ package main import ( + "fmt" "log" + _ "github.com/go-kratos/kratos/v2/encoding/json" + _ "github.com/go-kratos/kratos/v2/encoding/yaml" + "github.com/go-kratos/kratos/contrib/config/apollo/v2" "github.com/go-kratos/kratos/v2/config" ) +type bootstrap struct { + Application struct { + Name string `json:"name"` + Version string `json:"version"` + } `json:"application"` + + Event struct { + Key string `json:"key"` + Array []string `json:"array"` + } `json:"event"` + + Demo struct { + Deep struct { + Key string `json:"key"` + Value string `json:"value"` + } `json:"deep"` + } `json:"demo"` +} + func main() { c := config.New( config.WithSource( @@ -14,51 +37,45 @@ func main() { apollo.WithAppID("kratos"), apollo.WithCluster("dev"), apollo.WithEndpoint("http://localhost:8080"), - apollo.WithNamespace("application"), + apollo.WithNamespace("application,event.yaml,demo.json"), apollo.WithEnableBackup(), - apollo.WithSecret("895da1a174934ababb1b1223f5620a45"), + apollo.WithSecret("ad75b33c77ae4b9c9626d969c44f41ee"), ), ), ) + var bc bootstrap if err := c.Load(); err != nil { panic(err) } - // Get a value associated with the key - name, err := c.Value("name").String() - if err != nil { - panic(err) - } - log.Printf("service: %s", name) - // Defines the config JSON Field - var v struct { - Name string `json:"name"` - Version string `json:"version"` - } + scan(c, &bc) - // Unmarshal the config to struct - if err = c.Scan(&v); err != nil { - panic(err) - } - log.Printf("config: %+v", v) + value(c, "application") + value(c, "application.name") + value(c, "event.array") + value(c, "demo.deep") - // Get a value associated with the key - name, err = c.Value("name").String() - if err != nil { - panic(err) - } - log.Printf("service: %s", name) + watch(c, "application") + <-make(chan struct{}) +} - // watch key - if err = c.Watch("name", func(key string, value config.Value) { - n, e := value.String() - if e != nil { - panic(e) - } - log.Printf("config changed: %s = %s\n", key, n) +func scan(c config.Config, bc *bootstrap) { + err := c.Scan(bc) + fmt.Printf("=========== scan result =============\n") + fmt.Printf("err: %v\n", err) + fmt.Printf("cfg: %+v\n\n", bc) +} + +func value(c config.Config, key string) { + fmt.Printf("=========== value result =============\n") + v := c.Value(key).Load() + fmt.Printf("key=%s, load: %+v\n\n", key, v) +} + +func watch(c config.Config, key string) { + if err := c.Watch(key, func(key string, value config.Value) { + log.Printf("config(key=%s) changed: %s\n", key, value.Load()) }); err != nil { panic(err) } - - <-make(chan struct{}) }