Compare commits

...

20 Commits

Author SHA1 Message Date
baozhecheng c41d158696 fix 2 years ago
baozhecheng ce8ed7e4c3 fix 2 years ago
baozhecheng 46eea6f3a1 fix 2 years ago
baozhecheng 58e9d41300 fix 2 years ago
baozhecheng f0f494682b fix 2 years ago
baozhecheng 2affd568b1 fix 2 years ago
baozhecheng a25bf5d00f fix 2 years ago
baozhecheng 0ac2cea598 fix 2 years ago
baozhecheng 1e0a3c04be fix 2 years ago
baozhecheng 4e99331879 fix lint 2 years ago
baozhecheng fec026af5f fix version 2 years ago
baozhecheng b3994334ed fix nacos version 2 years ago
baozhecheng 8fd97b6258 Merge branch 'main' into fix/apollo 2 years ago
baozhecheng 7342800caa fix 2 years ago
baozhecheng dad8275bd5 fix lint 2 years ago
baozhecheng 2eea21cc23 fix 2 years ago
baozhecheng 2e38a736da fix 2 years ago
baozhecheng 0017f5aa53 fix 2 years ago
baozhecheng 102092009e fix 2 years ago
baozhecheng c423dc3494 fix: apollo unable to get and watch to the properties file 2 years ago
  1. 12
      .github/workflows/go.yml
  2. 139
      contrib/config/apollo/apollo.go
  3. 14
      contrib/config/apollo/parser.go
  4. 27
      contrib/config/apollo/watcher.go
  5. 5
      contrib/registry/nacos/registry_test.go

@ -29,12 +29,12 @@ jobs:
ports:
- 8500:8500
nacos:
image: nacos/nacos-server:latest
image: nacos/nacos-server:v2.1.0
env:
MODE: standalone
ports:
- "8848:8848"
- "9848:9848"
- 8848:8848
- 9848:9848
polaris:
image: polarismesh/polaris-server-standalone:v1.9.0
ports:
@ -44,7 +44,7 @@ jobs:
steps:
- uses: actions/checkout@v3
- name: Set up Go
uses: actions/setup-go@v3.2.1
uses: actions/setup-go@v3.2.0
with:
go-version: ${{ matrix.go }}
@ -52,8 +52,8 @@ jobs:
run: go build ./...
- name: Test
run: make test-coverage
run: |
make test-coverage
- name: Upload coverage to Codecov
run: bash <(curl -s https://codecov.io/bash)

@ -7,7 +7,10 @@ import (
"github.com/go-kratos/kratos/v2/log"
"github.com/apolloconfig/agollo/v4"
"github.com/apolloconfig/agollo/v4/constant"
apolloConfig "github.com/apolloconfig/agollo/v4/env/config"
"github.com/apolloconfig/agollo/v4/extension"
"github.com/go-kratos/kratos/v2/encoding"
)
type apollo struct {
@ -15,6 +18,13 @@ type apollo struct {
opt *options
}
const (
yaml = "yaml"
yml = "yml"
json = "json"
properties = "properties"
)
// Option is apollo option
type Option func(*options)
@ -26,6 +36,7 @@ type options struct {
namespace string
isBackupConfig bool
backupPath string
originConfig bool
}
// WithAppID with apollo config app id
@ -84,6 +95,16 @@ func WithBackupPath(backupPath string) Option {
}
}
// WithOriginalConfig use the original configuration file without parse processing
func WithOriginalConfig() Option {
return func(o *options) {
extension.AddFormatParser(constant.JSON, &jsonExtParser{})
extension.AddFormatParser(constant.YAML, &yamlExtParser{})
extension.AddFormatParser(constant.YML, &yamlExtParser{})
o.originConfig = true
}
}
func NewSource(opts ...Option) config.Source {
op := options{}
for _, o := range opts {
@ -108,31 +129,78 @@ func NewSource(opts ...Option) config.Source {
func format(ns string) string {
arr := strings.Split(ns, ".")
if len(arr) <= 1 {
return "json"
if len(arr) <= 1 || arr[len(arr)-1] == properties {
return json
}
return arr[len(arr)-1]
}
func (e *apollo) load() []*config.KeyValue {
kv := make([]*config.KeyValue, 0)
kvs := make([]*config.KeyValue, 0)
namespaces := strings.Split(e.opt.namespace, ",")
for _, ns := range namespaces {
if !e.opt.originConfig {
kv, err := e.getConfig(ns)
if err != nil {
log.Errorf("apollo get config failed,err:%v", err)
continue
}
kvs = append(kvs, kv)
continue
}
if strings.Contains(ns, ".") && !strings.Contains(ns, properties) &&
(format(ns) == yaml || format(ns) == yml || format(ns) == json) {
kv, err := e.getOriginConfig(ns)
if err != nil {
log.Errorf("apollo get config failed,err:%v", err)
continue
}
kvs = append(kvs, kv)
continue
} else {
kv, err := e.getConfig(ns)
if err != nil {
log.Errorf("apollo get config failed,err:%v", err)
continue
}
kvs = append(kvs, kv)
}
}
return kvs
}
func (e *apollo) getConfig(ns string) (*config.KeyValue, error) {
next := map[string]interface{}{}
e.client.GetConfigCache(ns).Range(func(key, value interface{}) bool {
// all values are out properties format
resolve(genKey(ns, key.(string)), value, next)
return true
})
f := format(ns)
codec := encoding.GetCodec(f)
val, err := codec.Marshal(next)
if err != nil {
return nil, err
}
return &config.KeyValue{
Key: ns,
Value: val,
Format: f,
}, nil
}
func (e apollo) getOriginConfig(ns string) (*config.KeyValue, error) {
value, err := e.client.GetConfigCache(ns).Get("content")
if err != nil {
log.Warnw("apollo get config failed", "err", err)
return nil, err
}
// serialize the namespace content KeyValue into bytes.
kv = append(kv, &config.KeyValue{
return &config.KeyValue{
Key: ns,
Value: []byte(value.(string)),
Format: format(ns),
})
}
return kv
}, nil
}
func (e *apollo) Load() (kv []*config.KeyValue, err error) {
@ -146,3 +214,54 @@ func (e *apollo) Watch() (config.Watcher, error) {
}
return w, nil
}
// resolve convert kv pair into one map[string]interface{} by split key into different
// map level. such as: app.name = "application" => map[app][name] = "application"
func resolve(key string, value interface{}, target map[string]interface{}) {
// expand key "aaa.bbb" into map[aaa]map[bbb]interface{}
keys := strings.Split(key, ".")
last := len(keys) - 1
cursor := target
for i, k := range keys {
if i == last {
cursor[k] = value
break
}
// not the last key, be deeper
v, ok := cursor[k]
if !ok {
// create a new map
deeper := make(map[string]interface{})
cursor[k] = deeper
cursor = deeper
continue
}
// current exists, then check existing value type, if it's not map
// that means duplicate keys, and at least one is not map instance.
if cursor, ok = v.(map[string]interface{}); !ok {
log.Warnf("duplicate key: %v\n", strings.Join(keys[:i+1], "."))
break
}
}
}
// genKey got the key of config.KeyValue pair.
// eg: namespace.ext with subKey got namespace.subKey
func genKey(ns, sub string) string {
arr := strings.Split(ns, ".")
if len(arr) < 1 {
return sub
}
if len(arr) == 1 {
if ns == "" {
return sub
}
return ns + "." + sub
}
return strings.Join(arr[:len(arr)-1], ".") + "." + sub
}

@ -1,10 +1,5 @@
package apollo
import (
"github.com/apolloconfig/agollo/v4/constant"
"github.com/apolloconfig/agollo/v4/extension"
)
type jsonExtParser struct{}
func (parser jsonExtParser) Parse(configContent interface{}) (map[string]interface{}, error) {
@ -13,13 +8,6 @@ func (parser jsonExtParser) Parse(configContent interface{}) (map[string]interfa
type yamlExtParser struct{}
func (parser yamlExtParser) Parse(configContent interface{}) (out map[string]interface{}, err error) {
func (parser yamlExtParser) Parse(configContent interface{}) (map[string]interface{}, error) {
return map[string]interface{}{"content": configContent}, nil
}
func init() {
// add json/yaml/yml format
extension.AddFormatParser(constant.JSON, &jsonExtParser{})
extension.AddFormatParser(constant.YAML, &yamlExtParser{})
extension.AddFormatParser(constant.YML, &yamlExtParser{})
}

@ -2,6 +2,9 @@ package apollo
import (
"context"
"strings"
"github.com/go-kratos/kratos/v2/encoding"
"github.com/go-kratos/kratos/v2/config"
"github.com/go-kratos/kratos/v2/log"
@ -23,6 +26,8 @@ type customChangeListener struct {
func (c *customChangeListener) onChange(namespace string, changes map[string]*storage.ConfigChange) []*config.KeyValue {
kv := make([]*config.KeyValue, 0, 2)
if strings.Contains(namespace, ".") && !strings.Contains(namespace, properties) &&
(format(namespace) == yaml || format(namespace) == yml || format(namespace) == json) {
value, err := c.apollo.client.GetConfigCache(namespace).Get("content")
if err != nil {
log.Warnw("apollo get config failed", "err", err)
@ -36,6 +41,28 @@ func (c *customChangeListener) onChange(namespace string, changes map[string]*st
return kv
}
next := make(map[string]interface{})
for key, change := range changes {
resolve(genKey(namespace, key), change.NewValue, next)
}
f := format(namespace)
codec := encoding.GetCodec(f)
val, err := codec.Marshal(next)
if err != nil {
log.Warnf("apollo could not handle namespace %s: %v", namespace, err)
return nil
}
kv = append(kv, &config.KeyValue{
Key: namespace,
Value: val,
Format: f,
})
return kv
}
func (c *customChangeListener) OnChange(changeEvent *storage.ChangeEvent) {
change := c.onChange(changeEvent.Namespace, changeEvent.Changes)
if len(change) == 0 {

@ -2,6 +2,8 @@ package nacos
import (
"context"
"fmt"
"os/exec"
"reflect"
"testing"
"time"
@ -14,6 +16,9 @@ import (
)
func TestRegistry_Register(t *testing.T) {
exec := exec.Command("lsof-i:8848")
exec.Run()
fmt.Println(exec.Stdout)
sc := []constant.ServerConfig{
*constant.NewServerConfig("127.0.0.1", 8848),
}

Loading…
Cancel
Save