package apollo import ( "context" "errors" "flag" "log" "os" "strings" "sync" "time" "github.com/philchia/agollo/v4" "github.com/go-kratos/kratos/pkg/conf/paladin" ) var ( _ paladin.Client = &apollo{} defaultValue = "" ) type apolloWatcher struct { keys []string // in apollo, they're called namespaces C chan paladin.Event } func newApolloWatcher(keys []string) *apolloWatcher { return &apolloWatcher{keys: keys, C: make(chan paladin.Event, 5)} } func (aw *apolloWatcher) HasKey(key string) bool { if len(aw.keys) == 0 { return true } for _, k := range aw.keys { if k == key { return true } } return false } func (aw *apolloWatcher) Handle(event paladin.Event) { select { case aw.C <- event: default: log.Printf("paladin: event channel full discard ns %s update event", event.Key) } } // apollo is apollo config client. type apollo struct { client agollo.Client values *paladin.Map wmu sync.RWMutex watchers map[*apolloWatcher]struct{} } // Config is apollo config client config. type Config struct { AppID string `json:"app_id"` Cluster string `json:"cluster"` CacheDir string `json:"cache_dir"` MetaAddr string `json:"meta_addr"` Namespaces []string `json:"namespaces"` AccesskeySecret string `json:"accesskey_secret"` } type apolloDriver struct{} var ( confAppID, confCluster, confCacheDir, confMetaAddr, confNamespaces, accesskeySecret string ) func init() { addApolloFlags() paladin.Register(PaladinDriverApollo, &apolloDriver{}) } func addApolloFlags() { flag.StringVar(&confAppID, "apollo.appid", "", "apollo app id") flag.StringVar(&confCluster, "apollo.cluster", "", "apollo cluster") flag.StringVar(&confCacheDir, "apollo.cachedir", "/tmp", "apollo cache dir") flag.StringVar(&confMetaAddr, "apollo.metaaddr", "", "apollo meta server addr, e.g. localhost:8080") flag.StringVar(&confNamespaces, "apollo.namespaces", "", "subscribed apollo namespaces, comma separated, e.g. app.yml,mysql.yml") flag.StringVar(&accesskeySecret, "apollo.accesskeysecret", "", "apollo accesskeysecret") } func buildConfigForApollo() (c *Config, err error) { if appidFromEnv := os.Getenv("APOLLO_APP_ID"); appidFromEnv != "" { confAppID = appidFromEnv } if confAppID == "" { err = errors.New("invalid apollo appid, pass it via APOLLO_APP_ID=xxx with env or --apollo.appid=xxx with flag") return } if clusterFromEnv := os.Getenv("APOLLO_CLUSTER"); clusterFromEnv != "" { confCluster = clusterFromEnv } if confCluster == "" { err = errors.New("invalid apollo cluster, pass it via APOLLO_CLUSTER=xxx with env or --apollo.cluster=xxx with flag") return } if cacheDirFromEnv := os.Getenv("APOLLO_CACHE_DIR"); cacheDirFromEnv != "" { confCacheDir = cacheDirFromEnv } if metaAddrFromEnv := os.Getenv("APOLLO_META_ADDR"); metaAddrFromEnv != "" { confMetaAddr = metaAddrFromEnv } if confMetaAddr == "" { err = errors.New("invalid apollo meta addr, pass it via APOLLO_META_ADDR=xxx with env or --apollo.metaaddr=xxx with flag") return } if namespacesFromEnv := os.Getenv("APOLLO_NAMESPACES"); namespacesFromEnv != "" { confNamespaces = namespacesFromEnv } namespaceNames := strings.Split(confNamespaces, ",") if len(namespaceNames) == 0 { err = errors.New("invalid apollo namespaces, pass it via APOLLO_NAMESPACES=xxx with env or --apollo.namespaces=xxx with flag") return } if accesskeySecretEnv := os.Getenv("APOLLO_ACCESS_KEY_SECRET"); accesskeySecretEnv != "" { accesskeySecret = accesskeySecretEnv } c = &Config{ AppID: confAppID, Cluster: confCluster, CacheDir: confCacheDir, MetaAddr: confMetaAddr, Namespaces: namespaceNames, AccesskeySecret: accesskeySecret, } return } // New new an apollo config client. // it watches apollo namespaces changes and updates local cache. // BTW, in our context, namespaces in apollo means keys in paladin. func (ad *apolloDriver) New() (paladin.Client, error) { c, err := buildConfigForApollo() if err != nil { return nil, err } return ad.new(c) } func (ad *apolloDriver) new(conf *Config) (paladin.Client, error) { if conf == nil { err := errors.New("invalid apollo conf") return nil, err } client := agollo.NewClient(&agollo.Conf{ AppID: conf.AppID, Cluster: conf.Cluster, NameSpaceNames: conf.Namespaces, // these namespaces will be subscribed at init CacheDir: conf.CacheDir, MetaAddr: conf.MetaAddr, AccesskeySecret: conf.AccesskeySecret, }) err := client.Start() if err != nil { return nil, err } a := &apollo{ client: client, values: new(paladin.Map), watchers: make(map[*apolloWatcher]struct{}), } raws, err := a.loadValues(conf.Namespaces) if err != nil { return nil, err } a.values.Store(raws) // watch namespaces by default. a.WatchEvent(context.TODO(), conf.Namespaces...) go a.watchproc(conf.Namespaces) return a, nil } // loadValues load values from apollo namespaces to values func (a *apollo) loadValues(keys []string) (values map[string]*paladin.Value, err error) { values = make(map[string]*paladin.Value, len(keys)) for _, k := range keys { if values[k], err = a.loadValue(k); err != nil { return } } return } // loadValue load value from apollo namespace content to value func (a *apollo) loadValue(key string) (*paladin.Value, error) { content := a.client.GetContent(agollo.WithNamespace(key)) return paladin.NewValue(content, content), nil } // reloadValue reload value by key and send event func (a *apollo) reloadValue(key string) (err error) { // NOTE: in some case immediately read content from client after receive event // will get old content due to cache, sleep 100ms make sure get correct content. time.Sleep(100 * time.Millisecond) var ( value *paladin.Value rawValue string ) value, err = a.loadValue(key) if err != nil { return } rawValue, err = value.Raw() if err != nil { return } raws := a.values.Load() raws[key] = value a.values.Store(raws) a.wmu.RLock() n := 0 for w := range a.watchers { if w.HasKey(key) { n++ // FIXME(Colstuwjx): check change event and send detail type like EventAdd\Update\Delete. w.Handle(paladin.Event{Event: paladin.EventUpdate, Key: key, Value: rawValue}) } } a.wmu.RUnlock() log.Printf("paladin: reload config: %s events: %d\n", key, n) return } // apollo config daemon to watch remote apollo notifications func (a *apollo) watchproc(keys []string) { a.client.OnUpdate(func(event *agollo.ChangeEvent) { if err := a.reloadValue(event.Namespace); err != nil { log.Printf("paladin: load key: %s error: %s, skipped", event.Namespace, err) } }) } // Get return value by key. func (a *apollo) Get(key string) *paladin.Value { return a.values.Get(key) } // GetAll return value map. func (a *apollo) GetAll() *paladin.Map { return a.values } // WatchEvent watch with the specified keys. func (a *apollo) WatchEvent(ctx context.Context, keys ...string) <-chan paladin.Event { aw := newApolloWatcher(keys) err := a.client.SubscribeToNamespaces(keys...) if err != nil { log.Printf("subscribe namespaces %v failed, %v", keys, err) return aw.C } a.wmu.Lock() a.watchers[aw] = struct{}{} a.wmu.Unlock() return aw.C } // Close close watcher. func (a *apollo) Close() (err error) { if err = a.client.Stop(); err != nil { return } a.wmu.RLock() for w := range a.watchers { close(w.C) } a.wmu.RUnlock() return }