From fa32988c6f0ee9472b29361c436a7a0c8deefe38 Mon Sep 17 00:00:00 2001 From: Tony Date: Thu, 9 May 2019 22:46:57 +0800 Subject: [PATCH] add file watcher --- pkg/conf/paladin/file.go | 145 +++++++++++++++++++++------------------ 1 file changed, 80 insertions(+), 65 deletions(-) diff --git a/pkg/conf/paladin/file.go b/pkg/conf/paladin/file.go index 7c88b95a1..10d152419 100644 --- a/pkg/conf/paladin/file.go +++ b/pkg/conf/paladin/file.go @@ -15,20 +15,43 @@ import ( "github.com/fsnotify/fsnotify" ) -const ( - defaultChSize = 10 -) - var _ Client = &file{} +type watcher struct { + keys []string + C chan Event +} + +func newWatcher(keys []string) *watcher { + return &watcher{keys: keys, C: make(chan Event, 5)} +} + +func (w *watcher) HasKey(key string) bool { + if len(w.keys) == 0 { + return true + } + for _, k := range w.keys { + if keyNamed(k) == key { + return true + } + } + return false +} + +func (w *watcher) Handle(event Event) { + select { + case w.C <- event: + default: + log.Printf("paladin: event channel full discard file %s update event", event.Key) + } +} + // file is file config client. type file struct { - baseDir string values *Map - watchChs map[string][]chan Event - mx sync.Mutex - wg sync.WaitGroup - done chan struct{} + wmu sync.RWMutex + notify *fsnotify.Watcher + watchers map[*watcher]struct{} } // NewFile new a config file client. @@ -36,28 +59,23 @@ type file struct { // conf = /data/conf/app/xxx.toml func NewFile(base string) (Client, error) { base = filepath.FromSlash(base) - paths, err := readAllPaths(base) + raws, err := loadValues(base) if err != nil { return nil, err } - if len(paths) == 0 { - return nil, errors.New("empty config path") - } - raws, err := loadValuesFromPaths(paths) + notify, err := fsnotify.NewWatcher() if err != nil { return nil, err } values := new(Map) values.Store(raws) - fc := &file{ - baseDir: base, + f := &file{ values: values, - watchChs: make(map[string][]chan Event), - done: make(chan struct{}, 1), + notify: notify, + watchers: make(map[*watcher]struct{}), } - fc.wg.Add(1) - go fc.daemon() - return fc, nil + go f.watch(base) + return f, nil } // Get return value by key. @@ -70,75 +88,74 @@ func (f *file) GetAll() *Map { return f.values } -// WatchEvent watch multi key. +// WatchEvent watch with the specified keys. func (f *file) WatchEvent(ctx context.Context, keys ...string) <-chan Event { - f.mx.Lock() - defer f.mx.Unlock() - ch := make(chan Event, defaultChSize) - for _, key := range keys { - f.watchChs[key] = append(f.watchChs[key], ch) - } - return ch + w := newWatcher(keys) + f.wmu.Lock() + f.watchers[w] = struct{}{} + f.wmu.Unlock() + return w.C } // Close close watcher. func (f *file) Close() error { - f.done <- struct{}{} - f.wg.Wait() + if err := f.notify.Close(); err != nil { + return err + } + f.wmu.RLock() + for w := range f.watchers { + close(w.C) + } + f.wmu.RUnlock() return nil } // file config daemon to watch file modification -func (f *file) daemon() { - defer f.wg.Done() - fswatcher, err := fsnotify.NewWatcher() - if err != nil { - log.Printf("paladin: create file watcher fail! reload function will lose efficacy error: %s", err) - return - } - if err = fswatcher.Add(f.baseDir); err != nil { - log.Printf("paladin: create fsnotify for base path %s fail %s, reload function will lose efficacy", f.baseDir, err) +func (f *file) watch(base string) { + if err := f.notify.Add(base); err != nil { + log.Printf("paladin: create fsnotify for base path %s fail %s, reload function will lose efficacy", base, err) return } - log.Printf("paladin: start watch config: %s", f.baseDir) - for event := range fswatcher.Events { + log.Printf("paladin: start watch config: %s", base) + for event := range f.notify.Events { // use vim edit config will trigger rename switch { case event.Op&fsnotify.Write == fsnotify.Write, event.Op&fsnotify.Create == fsnotify.Create: - f.reloadFile(event.Name) + if err := f.reloadFile(event.Name); err != nil { + log.Printf("paladin: load file: %s error: %s, skipped", event.Name, err) + } default: log.Printf("paladin: unsupport event %s ingored", event) } } } -func (f *file) reloadFile(name string) { +func (f *file) reloadFile(fpath string) (err error) { // NOTE: in some case immediately read file content after receive event // will get old content, sleep 100ms make sure get correct content. time.Sleep(100 * time.Millisecond) - key := filepath.Base(name) - value, err := loadValue(name) + value, err := loadValue(fpath) if err != nil { - log.Printf("paladin: load file: %s error: %s, skipped", name, err) return } + key := keyNamed(path.Base(fpath)) raws := f.values.Load() - raws[name] = value + raws[key] = value f.values.Store(raws) - f.mx.Lock() - chs := f.watchChs[key] - f.mx.Unlock() - for _, ch := range chs { - select { - case ch <- Event{Event: EventUpdate, Value: value.raw}: - default: - log.Printf("paladin: event channel full discard file %s update event", name) + f.wmu.RLock() + n := 0 + for w := range f.watchers { + if w.HasKey(key) { + n++ + w.Handle(Event{Event: EventUpdate, Key: key, Value: value.raw}) } } - log.Printf("paladin: reload config: %s notify: %d\n", name, len(chs)) + f.wmu.RUnlock() + log.Printf("paladin: reload config: %s events: %d\n", key, n) + return } -func readAllPaths(base string) ([]string, error) { +func loadValues(base string) (map[string]*Value, error) { fi, err := os.Stat(base) if err != nil { return nil, fmt.Errorf("paladin: check local config file fail! error: %s", err) @@ -157,11 +174,9 @@ func readAllPaths(base string) ([]string, error) { } else { paths = append(paths, base) } - return paths, nil -} - -func loadValuesFromPaths(paths []string) (map[string]*Value, error) { - var err error + if len(paths) == 0 { + return nil, errors.New("empty config path") + } values := make(map[string]*Value, len(paths)) for _, fpath := range paths { if values[path.Base(fpath)], err = loadValue(fpath); err != nil { @@ -171,8 +186,8 @@ func loadValuesFromPaths(paths []string) (map[string]*Value, error) { return values, nil } -func loadValue(fpath string) (*Value, error) { - data, err := ioutil.ReadFile(fpath) +func loadValue(name string) (*Value, error) { + data, err := ioutil.ReadFile(name) if err != nil { return nil, err }