add file watcher

pull/93/head
Tony 6 years ago
parent 0968936285
commit fa32988c6f
  1. 145
      pkg/conf/paladin/file.go

@ -15,20 +15,43 @@ import (
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
) )
const (
defaultChSize = 10
)
var _ Client = &file{} var _ Client = &file{}
type watcher struct {
keys []string
C chan Event
}
func newWatcher(keys []string) *watcher {
return &watcher{keys: keys, C: make(chan Event, 5)}
}
func (w *watcher) HasKey(key string) bool {
if len(w.keys) == 0 {
return true
}
for _, k := range w.keys {
if keyNamed(k) == key {
return true
}
}
return false
}
func (w *watcher) Handle(event Event) {
select {
case w.C <- event:
default:
log.Printf("paladin: event channel full discard file %s update event", event.Key)
}
}
// file is file config client. // file is file config client.
type file struct { type file struct {
baseDir string
values *Map values *Map
watchChs map[string][]chan Event wmu sync.RWMutex
mx sync.Mutex notify *fsnotify.Watcher
wg sync.WaitGroup watchers map[*watcher]struct{}
done chan struct{}
} }
// NewFile new a config file client. // NewFile new a config file client.
@ -36,28 +59,23 @@ type file struct {
// conf = /data/conf/app/xxx.toml // conf = /data/conf/app/xxx.toml
func NewFile(base string) (Client, error) { func NewFile(base string) (Client, error) {
base = filepath.FromSlash(base) base = filepath.FromSlash(base)
paths, err := readAllPaths(base) raws, err := loadValues(base)
if err != nil { if err != nil {
return nil, err return nil, err
} }
if len(paths) == 0 { notify, err := fsnotify.NewWatcher()
return nil, errors.New("empty config path")
}
raws, err := loadValuesFromPaths(paths)
if err != nil { if err != nil {
return nil, err return nil, err
} }
values := new(Map) values := new(Map)
values.Store(raws) values.Store(raws)
fc := &file{ f := &file{
baseDir: base,
values: values, values: values,
watchChs: make(map[string][]chan Event), notify: notify,
done: make(chan struct{}, 1), watchers: make(map[*watcher]struct{}),
} }
fc.wg.Add(1) go f.watch(base)
go fc.daemon() return f, nil
return fc, nil
} }
// Get return value by key. // Get return value by key.
@ -70,75 +88,74 @@ func (f *file) GetAll() *Map {
return f.values return f.values
} }
// WatchEvent watch multi key. // WatchEvent watch with the specified keys.
func (f *file) WatchEvent(ctx context.Context, keys ...string) <-chan Event { func (f *file) WatchEvent(ctx context.Context, keys ...string) <-chan Event {
f.mx.Lock() w := newWatcher(keys)
defer f.mx.Unlock() f.wmu.Lock()
ch := make(chan Event, defaultChSize) f.watchers[w] = struct{}{}
for _, key := range keys { f.wmu.Unlock()
f.watchChs[key] = append(f.watchChs[key], ch) return w.C
}
return ch
} }
// Close close watcher. // Close close watcher.
func (f *file) Close() error { func (f *file) Close() error {
f.done <- struct{}{} if err := f.notify.Close(); err != nil {
f.wg.Wait() return err
}
f.wmu.RLock()
for w := range f.watchers {
close(w.C)
}
f.wmu.RUnlock()
return nil return nil
} }
// file config daemon to watch file modification // file config daemon to watch file modification
func (f *file) daemon() { func (f *file) watch(base string) {
defer f.wg.Done() if err := f.notify.Add(base); err != nil {
fswatcher, err := fsnotify.NewWatcher() log.Printf("paladin: create fsnotify for base path %s fail %s, reload function will lose efficacy", base, err)
if err != nil {
log.Printf("paladin: create file watcher fail! reload function will lose efficacy error: %s", err)
return
}
if err = fswatcher.Add(f.baseDir); err != nil {
log.Printf("paladin: create fsnotify for base path %s fail %s, reload function will lose efficacy", f.baseDir, err)
return return
} }
log.Printf("paladin: start watch config: %s", f.baseDir) log.Printf("paladin: start watch config: %s", base)
for event := range fswatcher.Events { for event := range f.notify.Events {
// use vim edit config will trigger rename // use vim edit config will trigger rename
switch { switch {
case event.Op&fsnotify.Write == fsnotify.Write, event.Op&fsnotify.Create == fsnotify.Create: case event.Op&fsnotify.Write == fsnotify.Write, event.Op&fsnotify.Create == fsnotify.Create:
f.reloadFile(event.Name) if err := f.reloadFile(event.Name); err != nil {
log.Printf("paladin: load file: %s error: %s, skipped", event.Name, err)
}
default: default:
log.Printf("paladin: unsupport event %s ingored", event) log.Printf("paladin: unsupport event %s ingored", event)
} }
} }
} }
func (f *file) reloadFile(name string) { func (f *file) reloadFile(fpath string) (err error) {
// NOTE: in some case immediately read file content after receive event // NOTE: in some case immediately read file content after receive event
// will get old content, sleep 100ms make sure get correct content. // will get old content, sleep 100ms make sure get correct content.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
key := filepath.Base(name) value, err := loadValue(fpath)
value, err := loadValue(name)
if err != nil { if err != nil {
log.Printf("paladin: load file: %s error: %s, skipped", name, err)
return return
} }
key := keyNamed(path.Base(fpath))
raws := f.values.Load() raws := f.values.Load()
raws[name] = value raws[key] = value
f.values.Store(raws) f.values.Store(raws)
f.mx.Lock() f.wmu.RLock()
chs := f.watchChs[key] n := 0
f.mx.Unlock() for w := range f.watchers {
for _, ch := range chs { if w.HasKey(key) {
select { n++
case ch <- Event{Event: EventUpdate, Value: value.raw}: w.Handle(Event{Event: EventUpdate, Key: key, Value: value.raw})
default:
log.Printf("paladin: event channel full discard file %s update event", name)
} }
} }
log.Printf("paladin: reload config: %s notify: %d\n", name, len(chs)) f.wmu.RUnlock()
log.Printf("paladin: reload config: %s events: %d\n", key, n)
return
} }
func readAllPaths(base string) ([]string, error) { func loadValues(base string) (map[string]*Value, error) {
fi, err := os.Stat(base) fi, err := os.Stat(base)
if err != nil { if err != nil {
return nil, fmt.Errorf("paladin: check local config file fail! error: %s", err) return nil, fmt.Errorf("paladin: check local config file fail! error: %s", err)
@ -157,11 +174,9 @@ func readAllPaths(base string) ([]string, error) {
} else { } else {
paths = append(paths, base) paths = append(paths, base)
} }
return paths, nil if len(paths) == 0 {
} return nil, errors.New("empty config path")
}
func loadValuesFromPaths(paths []string) (map[string]*Value, error) {
var err error
values := make(map[string]*Value, len(paths)) values := make(map[string]*Value, len(paths))
for _, fpath := range paths { for _, fpath := range paths {
if values[path.Base(fpath)], err = loadValue(fpath); err != nil { if values[path.Base(fpath)], err = loadValue(fpath); err != nil {
@ -171,8 +186,8 @@ func loadValuesFromPaths(paths []string) (map[string]*Value, error) {
return values, nil return values, nil
} }
func loadValue(fpath string) (*Value, error) { func loadValue(name string) (*Value, error) {
data, err := ioutil.ReadFile(fpath) data, err := ioutil.ReadFile(name)
if err != nil { if err != nil {
return nil, err return nil, err
} }

Loading…
Cancel
Save