fix(config/apollo): apollo namespace (#1516)

* fix(config/apollo): support multiple namespace

* fix(config/apollo): modify example and test

* fix(config/apollo): recoding watcher

* styl(config/apollo): package sort; use log instead of fmt

* styl(config/apollo): use kratos/log package instead of fmt

* styl(config/apollo): optimise code with reviewer advises; fix some edge cases on genKey function.

* styl(config/apollo): rename `convertProperties` as `resolve`
pull/1576/head
Yeqllo 3 years ago committed by GitHub
parent eb0730a1b0
commit eec45a3d0a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 121
      contrib/config/apollo/apollo.go
  2. 172
      contrib/config/apollo/apollo_test.go
  3. 1
      contrib/config/apollo/go.mod
  4. 25
      contrib/config/apollo/json_parser.go
  5. 85
      contrib/config/apollo/watcher.go
  6. 4
      examples/config/apollo/README.md
  7. 83
      examples/config/apollo/main.go

@ -1,7 +1,12 @@
package apollo
import (
"fmt"
"strings"
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/encoding"
"github.com/go-kratos/kratos/v2/log"
"github.com/apolloconfig/agollo/v4"
apolloConfig "github.com/apolloconfig/agollo/v4/env/config"
@ -9,6 +14,7 @@ import (
type apollo struct {
client *agollo.Client
opt *options
}
// Option is apollo option
@ -22,6 +28,8 @@ type options struct {
namespace string
isBackupConfig bool
backupPath string
logger log.Logger
}
// WithAppID with apollo config app id
@ -80,8 +88,19 @@ func WithBackupPath(backupPath string) Option {
}
}
// WithLogger use custom logger to replace default logger.
func WithLogger(logger log.Logger) Option {
return func(o *options) {
if logger != nil {
o.logger = logger
}
}
}
func NewSource(opts ...Option) config.Source {
op := options{}
op := options{
logger: log.DefaultLogger,
}
for _, o := range opts {
o(&op)
}
@ -99,18 +118,104 @@ func NewSource(opts ...Option) config.Source {
if err != nil {
panic(err)
}
return &apollo{client}
return &apollo{client: client, opt: &op}
}
// genKey got the key of config.KeyValue pair.
// eg: namespace.ext with subKey got namespace.subKey
func genKey(ns, sub string) string {
arr := strings.Split(ns, ".")
if len(arr) < 1 {
return sub
}
if len(arr) == 1 {
if ns == "" {
return sub
}
return ns + "." + sub
}
return strings.Join(arr[:len(arr)-1], ".") + "." + sub
}
// resolve convert kv pair into one map[string]interface{} by split key into different
// map level. such as: app.name = "application" => map[app][name] = "application"
func resolve(key string, value interface{}, target map[string]interface{}) {
// expand key "aaa.bbb" into map[aaa]map[bbb]interface{}
keys := strings.Split(key, ".")
last := len(keys) - 1
cursor := target
for i, k := range keys {
if i == last {
cursor[k] = value
break
}
// not the last key, be deeper
v, ok := cursor[k]
if !ok {
// create a new map
deeper := make(map[string]interface{})
cursor[k] = deeper
cursor = deeper
continue
}
// current exists, then check existing value type, if it's not map
// that means duplicate keys, and at least one is not map instance.
if cursor, ok = v.(map[string]interface{}); !ok {
_ = log.DefaultLogger.Log(log.LevelWarn,
"msg",
fmt.Sprintf("duplicate key: %v\n", strings.Join(keys[:i+1], ".")),
)
break
}
}
}
func format(ns string) string {
arr := strings.Split(ns, ".")
if len(arr) <= 1 {
return "json"
}
return arr[len(arr)-1]
}
func (e *apollo) load() []*config.KeyValue {
kv := make([]*config.KeyValue, 0)
e.client.GetDefaultConfigCache().Range(func(key, value interface{}) bool {
namespaces := strings.Split(e.opt.namespace, ",")
for _, ns := range namespaces {
next := map[string]interface{}{}
e.client.GetConfigCache(ns).Range(func(key, value interface{}) bool {
// all values are out properties format
resolve(genKey(ns, key.(string)), value, next)
return true
})
// serialize the namespace content KeyValue into bytes.
f := format(ns)
codec := encoding.GetCodec(f)
val, err := codec.Marshal(next)
if err != nil {
_ = e.opt.logger.Log(log.LevelWarn,
"msg",
fmt.Sprintf("apollo could not handle namespace %s: %v", ns, err),
)
continue
}
kv = append(kv, &config.KeyValue{
Key: key.(string),
Value: []byte(value.(string)),
Key: ns,
Value: val,
Format: f,
})
return true
})
}
return kv
}
@ -119,7 +224,7 @@ func (e *apollo) Load() (kv []*config.KeyValue, err error) {
}
func (e *apollo) Watch() (config.Watcher, error) {
w, err := NewWatcher(e)
w, err := newWatcher(e, e.opt.logger)
if err != nil {
return nil, err
}

@ -0,0 +1,172 @@
package apollo
import (
"testing"
"github.com/stretchr/testify/assert"
)
func Test_genKey(t *testing.T) {
type args struct {
ns string
sub string
}
tests := []struct {
name string
args args
want string
}{
{
name: "case 1",
args: args{
ns: "",
sub: "has_no_ns",
},
want: "has_no_ns",
},
{
name: "case 2",
args: args{
ns: "ns.ext",
sub: "sub",
},
want: "ns.sub",
},
{
name: "case 3",
args: args{
ns: "",
sub: "",
},
want: "",
},
{
name: "case 4",
args: args{
ns: "ns.ext",
sub: "sub.sub2.sub3",
},
want: "ns.sub.sub2.sub3",
},
{
name: "case 5",
args: args{
ns: "ns.more.ext",
sub: "sub.sub2.sub3",
},
want: "ns.more.sub.sub2.sub3",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := genKey(tt.args.ns, tt.args.sub); got != tt.want {
t.Errorf("genKey() = %v, want %v", got, tt.want)
}
})
}
}
func Test_format(t *testing.T) {
type args struct {
ns string
}
tests := []struct {
name string
args args
want string
}{
{
name: "case 0",
args: args{
ns: "ns.yaml",
},
want: "yaml",
},
{
name: "case 1",
args: args{
ns: "ns",
},
want: "json",
},
{
name: "case 2",
args: args{
ns: "ns.more.json",
},
want: "json",
},
{
name: "case 3",
args: args{
ns: "",
},
want: "json",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := format(tt.args.ns); got != tt.want {
t.Errorf("format() = %v, want %v", got, tt.want)
}
})
}
}
func Test_convertProperties(t *testing.T) {
type args struct {
key string
value interface{}
target map[string]interface{}
}
tests := []struct {
name string
args args
want map[string]interface{}
}{
{
name: "case 0",
args: args{
key: "application.name",
value: "app name",
target: map[string]interface{}{},
},
want: map[string]interface{}{
"application": map[string]interface{}{
"name": "app name",
},
},
},
{
name: "case 1",
args: args{
key: "application",
value: []string{"1", "2", "3"},
target: map[string]interface{}{},
},
want: map[string]interface{}{
"application": []string{"1", "2", "3"},
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
resolve(tt.args.key, tt.args.value, tt.args.target)
assert.Equal(t, tt.want, tt.args.target)
})
}
}
func Test_convertProperties_duplicate(t *testing.T) {
target := map[string]interface{}{}
resolve("application.name", "name", target)
assert.Contains(t, target, "application")
assert.Contains(t, target["application"], "name")
assert.Equal(t, "name", target["application"].(map[string]interface{})["name"])
// cause duplicate, the oldest value will be kept
resolve("application.name.first", "first name", target)
assert.Contains(t, target, "application")
assert.Contains(t, target["application"], "name")
assert.Equal(t, "name", target["application"].(map[string]interface{})["name"])
}

@ -4,6 +4,7 @@ go 1.16
require (
github.com/apolloconfig/agollo/v4 v4.0.8
github.com/stretchr/testify v1.7.0
github.com/go-kratos/kratos/v2 v2.1.0
)

@ -0,0 +1,25 @@
package apollo
import (
"encoding/json"
"github.com/apolloconfig/agollo/v4/constant"
"github.com/apolloconfig/agollo/v4/extension"
)
type jsonExtParser struct{}
func (j jsonExtParser) Parse(configContent interface{}) (map[string]interface{}, error) {
v, ok := configContent.(string)
if !ok {
return nil, nil
}
out := make(map[string]interface{}, 4)
err := json.Unmarshal([]byte(v), &out)
return out, err
}
func init() {
// add json format
extension.AddFormatParser(constant.JSON, &jsonExtParser{})
}

@ -1,53 +1,90 @@
package apollo
import (
"fmt"
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/encoding"
"github.com/go-kratos/kratos/v2/log"
"github.com/apolloconfig/agollo/v4/storage"
)
type watcher struct {
event chan []*config.KeyValue
out <-chan []*config.KeyValue
cancelFn func()
}
type customChangeListener struct {
event chan []*config.KeyValue
in chan<- []*config.KeyValue
logger log.Logger
}
func (c *customChangeListener) OnChange(changeEvent *storage.ChangeEvent) {
kv := make([]*config.KeyValue, 0)
for key, value := range changeEvent.Changes {
kv = append(kv, &config.KeyValue{
Key: key,
Value: []byte(value.NewValue.(string)),
})
func (c *customChangeListener) onChange(
namespace string, changes map[string]*storage.ConfigChange) []*config.KeyValue {
kv := make([]*config.KeyValue, 0, 2)
next := make(map[string]interface{})
for key, change := range changes {
resolve(genKey(namespace, key), change.NewValue, next)
}
c.event <- kv
f := format(namespace)
codec := encoding.GetCodec(f)
val, err := codec.Marshal(next)
if err != nil {
_ = c.logger.Log(log.LevelWarn,
"msg",
fmt.Sprintf("apollo could not handle namespace %s: %v", namespace, err),
)
return nil
}
kv = append(kv, &config.KeyValue{
Key: namespace,
Value: val,
Format: f,
})
return kv
}
func (c *customChangeListener) OnNewestChange(changeEvent *storage.FullChangeEvent) {
kv := make([]*config.KeyValue, 0)
for key, value := range changeEvent.Changes {
kv = append(kv, &config.KeyValue{
Key: key,
Value: []byte(value.(string)),
})
func (c *customChangeListener) OnChange(changeEvent *storage.ChangeEvent) {
change := c.onChange(changeEvent.Namespace, changeEvent.Changes)
if len(change) == 0 {
return
}
c.event <- kv
c.in <- change
}
func NewWatcher(a *apollo) (config.Watcher, error) {
e := make(chan []*config.KeyValue)
a.client.AddChangeListener(&customChangeListener{event: e})
return &watcher{event: e}, nil
func (c *customChangeListener) OnNewestChange(changeEvent *storage.FullChangeEvent) {}
func newWatcher(a *apollo, logger log.Logger) (config.Watcher, error) {
if logger == nil {
logger = log.DefaultLogger
}
changeCh := make(chan []*config.KeyValue)
a.client.AddChangeListener(&customChangeListener{in: changeCh, logger: logger})
return &watcher{
out: changeCh,
cancelFn: func() {
close(changeCh)
},
}, nil
}
// Next will be blocked until the Stop method is called
func (w *watcher) Next() ([]*config.KeyValue, error) {
return <-w.event, nil
return <-w.out, nil
}
func (w *watcher) Stop() error {
close(w.event)
if w.cancelFn != nil {
w.cancelFn()
}
return nil
}

@ -4,3 +4,7 @@ You can deploy Apollo yourself or use docker compose in example to start Apollo,
then modify the configuration in the code to your actual Apollo configuration,
and run the program
### Sample account
Account: `apollo`
Password: `admin`

@ -1,12 +1,35 @@
package main
import (
"fmt"
"log"
_ "github.com/go-kratos/kratos/v2/encoding/json"
_ "github.com/go-kratos/kratos/v2/encoding/yaml"
"github.com/go-kratos/kratos/contrib/config/apollo/v2"
"github.com/go-kratos/kratos/v2/config"
)
type bootstrap struct {
Application struct {
Name string `json:"name"`
Version string `json:"version"`
} `json:"application"`
Event struct {
Key string `json:"key"`
Array []string `json:"array"`
} `json:"event"`
Demo struct {
Deep struct {
Key string `json:"key"`
Value string `json:"value"`
} `json:"deep"`
} `json:"demo"`
}
func main() {
c := config.New(
config.WithSource(
@ -14,51 +37,45 @@ func main() {
apollo.WithAppID("kratos"),
apollo.WithCluster("dev"),
apollo.WithEndpoint("http://localhost:8080"),
apollo.WithNamespace("application"),
apollo.WithNamespace("application,event.yaml,demo.json"),
apollo.WithEnableBackup(),
apollo.WithSecret("895da1a174934ababb1b1223f5620a45"),
apollo.WithSecret("ad75b33c77ae4b9c9626d969c44f41ee"),
),
),
)
var bc bootstrap
if err := c.Load(); err != nil {
panic(err)
}
// Get a value associated with the key
name, err := c.Value("name").String()
if err != nil {
panic(err)
}
log.Printf("service: %s", name)
// Defines the config JSON Field
var v struct {
Name string `json:"name"`
Version string `json:"version"`
}
scan(c, &bc)
// Unmarshal the config to struct
if err = c.Scan(&v); err != nil {
panic(err)
}
log.Printf("config: %+v", v)
value(c, "application")
value(c, "application.name")
value(c, "event.array")
value(c, "demo.deep")
// Get a value associated with the key
name, err = c.Value("name").String()
if err != nil {
panic(err)
}
log.Printf("service: %s", name)
watch(c, "application")
<-make(chan struct{})
}
// watch key
if err = c.Watch("name", func(key string, value config.Value) {
n, e := value.String()
if e != nil {
panic(e)
}
log.Printf("config changed: %s = %s\n", key, n)
func scan(c config.Config, bc *bootstrap) {
err := c.Scan(bc)
fmt.Printf("=========== scan result =============\n")
fmt.Printf("err: %v\n", err)
fmt.Printf("cfg: %+v\n\n", bc)
}
func value(c config.Config, key string) {
fmt.Printf("=========== value result =============\n")
v := c.Value(key).Load()
fmt.Printf("key=%s, load: %+v\n\n", key, v)
}
func watch(c config.Config, key string) {
if err := c.Watch(key, func(key string, value config.Value) {
log.Printf("config(key=%s) changed: %s\n", key, value.Load())
}); err != nil {
panic(err)
}
<-make(chan struct{})
}

Loading…
Cancel
Save