Merge pull request #438 from bilibili/common/update-paladin

update paladin
pull/484/head
cheng wei 5 years ago committed by GitHub
commit 43a13f6aae
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 233
      pkg/conf/paladin/file.go
  2. 38
      pkg/conf/paladin/file_test.go

@ -2,80 +2,122 @@ package paladin
import ( import (
"context" "context"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"log" "log"
"os" "os"
"path" "path"
"path/filepath" "path/filepath"
"strings"
"sync" "sync"
"time" "time"
"github.com/fsnotify/fsnotify" "github.com/fsnotify/fsnotify"
) )
const (
defaultChSize = 10
)
var _ Client = &file{} var _ Client = &file{}
type watcher struct { // file is file config client.
keys []string type file struct {
C chan Event values *Map
rawVal map[string]*Value
watchChs map[string][]chan Event
mx sync.Mutex
wg sync.WaitGroup
base string
done chan struct{}
} }
func newWatcher(keys []string) *watcher { func isHiddenFile(name string) bool {
return &watcher{keys: keys, C: make(chan Event, 5)} // TODO: support windows.
return strings.HasPrefix(filepath.Base(name), ".")
} }
func (w *watcher) HasKey(key string) bool { func readAllPaths(base string) ([]string, error) {
if len(w.keys) == 0 { fi, err := os.Stat(base)
return true if err != nil {
return nil, fmt.Errorf("check local config file fail! error: %s", err)
}
// dirs or file to paths
var paths []string
if fi.IsDir() {
files, err := ioutil.ReadDir(base)
if err != nil {
return nil, fmt.Errorf("read dir %s error: %s", base, err)
} }
for _, k := range w.keys { for _, file := range files {
if KeyNamed(k) == key { if !file.IsDir() && !isHiddenFile(file.Name()) {
return true paths = append(paths, path.Join(base, file.Name()))
} }
} }
return false } else {
paths = append(paths, base)
}
return paths, nil
} }
func (w *watcher) Handle(event Event) { func loadValuesFromPaths(paths []string) (map[string]*Value, error) {
select { // laod config file to values
case w.C <- event: var err error
default: values := make(map[string]*Value, len(paths))
log.Printf("paladin: event channel full discard file %s update event", event.Key) for _, fpath := range paths {
if values[path.Base(fpath)], err = loadValue(fpath); err != nil {
return nil, err
} }
} }
return values, nil
}
// file is file config client. func loadValue(fpath string) (*Value, error) {
type file struct { data, err := ioutil.ReadFile(fpath)
values *Map if err != nil {
wmu sync.RWMutex return nil, err
notify *fsnotify.Watcher }
watchers map[*watcher]struct{} content := string(data)
return &Value{val: content, raw: content}, nil
} }
// NewFile new a config file client. // NewFile new a config file client.
// conf = /data/conf/app/ // conf = /data/conf/app/
// conf = /data/conf/app/xxx.toml // conf = /data/conf/app/xxx.toml
func NewFile(base string) (Client, error) { func NewFile(base string) (Client, error) {
// paltform slash
base = filepath.FromSlash(base) base = filepath.FromSlash(base)
raws, err := loadValues(base)
paths, err := readAllPaths(base)
if err != nil { if err != nil {
return nil, err return nil, err
} }
notify, err := fsnotify.NewWatcher() if len(paths) == 0 {
return nil, fmt.Errorf("empty config path")
}
rawVal, err := loadValuesFromPaths(paths)
if err != nil { if err != nil {
return nil, err return nil, err
} }
values := new(Map)
values.Store(raws) valMap := &Map{}
f := &file{ valMap.Store(rawVal)
values: values, fc := &file{
notify: notify, values: valMap,
watchers: make(map[*watcher]struct{}), rawVal: rawVal,
watchChs: make(map[string][]chan Event),
base: base,
done: make(chan struct{}, 1),
} }
go f.watchproc(base)
return f, nil fc.wg.Add(1)
go fc.daemon()
return fc, nil
} }
// Get return value by key. // Get return value by key.
@ -88,109 +130,74 @@ func (f *file) GetAll() *Map {
return f.values return f.values
} }
// WatchEvent watch with the specified keys. // WatchEvent watch multi key.
func (f *file) WatchEvent(ctx context.Context, keys ...string) <-chan Event { func (f *file) WatchEvent(ctx context.Context, keys ...string) <-chan Event {
w := newWatcher(keys) f.mx.Lock()
f.wmu.Lock() defer f.mx.Unlock()
f.watchers[w] = struct{}{} ch := make(chan Event, defaultChSize)
f.wmu.Unlock() for _, key := range keys {
return w.C f.watchChs[key] = append(f.watchChs[key], ch)
}
return ch
} }
// Close close watcher. // Close close watcher.
func (f *file) Close() error { func (f *file) Close() error {
if err := f.notify.Close(); err != nil { f.done <- struct{}{}
return err f.wg.Wait()
}
f.wmu.RLock()
for w := range f.watchers {
close(w.C)
}
f.wmu.RUnlock()
return nil return nil
} }
// file config daemon to watch file modification // file config daemon to watch file modification
func (f *file) watchproc(base string) { func (f *file) daemon() {
if err := f.notify.Add(base); err != nil { defer f.wg.Done()
log.Printf("paladin: create fsnotify for base path %s fail %s, reload function will lose efficacy", base, err) fswatcher, err := fsnotify.NewWatcher()
if err != nil {
log.Printf("create file watcher fail! reload function will lose efficacy error: %s", err)
return return
} }
log.Printf("paladin: start watch config: %s", base) if err = fswatcher.Add(f.base); err != nil {
for event := range f.notify.Events { log.Printf("create fsnotify for base path %s fail %s, reload function will lose efficacy", f.base, err)
// use vim edit config will trigger rename return
switch {
case event.Op&fsnotify.Write == fsnotify.Write, event.Op&fsnotify.Create == fsnotify.Create:
if err := f.reloadFile(event.Name); err != nil {
log.Printf("paladin: load file: %s error: %s, skipped", event.Name, err)
} }
log.Printf("start watch filepath: %s", f.base)
for event := range fswatcher.Events {
switch event.Op {
// use vim edit config will trigger rename
case fsnotify.Write, fsnotify.Create:
f.reloadFile(event.Name)
case fsnotify.Chmod:
default: default:
log.Printf("paladin: unsupport event %s ingored", event) log.Printf("unsupport event %s ingored", event)
} }
} }
} }
func (f *file) reloadFile(fpath string) (err error) { func (f *file) reloadFile(name string) {
if isHiddenFile(name) {
return
}
// NOTE: in some case immediately read file content after receive event // NOTE: in some case immediately read file content after receive event
// will get old content, sleep 100ms make sure get correct content. // will get old content, sleep 100ms make sure get correct content.
time.Sleep(100 * time.Millisecond) time.Sleep(100 * time.Millisecond)
value, err := loadValue(fpath) key := filepath.Base(name)
val, err := loadValue(name)
if err != nil { if err != nil {
log.Printf("load file %s error: %s, skipped", name, err)
return return
} }
key := KeyNamed(path.Base(fpath)) f.rawVal[key] = val
raws := f.values.Load() f.values.Store(f.rawVal)
raws[key] = value
f.values.Store(raws)
f.wmu.RLock()
n := 0
for w := range f.watchers {
if w.HasKey(key) {
n++
w.Handle(Event{Event: EventUpdate, Key: key, Value: value.raw})
}
}
f.wmu.RUnlock()
log.Printf("paladin: reload config: %s events: %d\n", key, n)
return
}
func loadValues(base string) (map[string]*Value, error) { f.mx.Lock()
fi, err := os.Stat(base) chs := f.watchChs[key]
if err != nil { f.mx.Unlock()
return nil, fmt.Errorf("paladin: check local config file fail! error: %s", err)
}
var paths []string
if fi.IsDir() {
files, err := ioutil.ReadDir(base)
if err != nil {
return nil, fmt.Errorf("paladin: read dir %s error: %s", base, err)
}
for _, file := range files {
if !file.IsDir() && (file.Mode()&os.ModeSymlink) != os.ModeSymlink {
paths = append(paths, path.Join(base, file.Name()))
}
}
} else {
paths = append(paths, base)
}
if len(paths) == 0 {
return nil, errors.New("empty config path")
}
values := make(map[string]*Value, len(paths))
for _, fpath := range paths {
if values[path.Base(fpath)], err = loadValue(fpath); err != nil {
return nil, err
}
}
return values, nil
}
func loadValue(name string) (*Value, error) { for _, ch := range chs {
data, err := ioutil.ReadFile(name) select {
if err != nil { case ch <- Event{Event: EventUpdate, Value: val.raw}:
return nil, err default:
log.Printf("event channel full discard file %s update event", name)
}
} }
content := string(data)
return &Value{val: content, raw: content}, nil
} }

@ -82,9 +82,8 @@ func TestFileEvent(t *testing.T) {
cli, err := NewFile(path) cli, err := NewFile(path)
assert.Nil(t, err) assert.Nil(t, err)
assert.NotNil(t, cli) assert.NotNil(t, cli)
time.Sleep(time.Millisecond * 100)
ch := cli.WatchEvent(context.Background(), "test.toml", "abc.toml") ch := cli.WatchEvent(context.Background(), "test.toml", "abc.toml")
time.Sleep(time.Millisecond * 100) time.Sleep(time.Millisecond)
ioutil.WriteFile(path+"test.toml", []byte(`hello`), 0644) ioutil.WriteFile(path+"test.toml", []byte(`hello`), 0644)
timeout := time.NewTimer(time.Second) timeout := time.NewTimer(time.Second)
select { select {
@ -94,4 +93,39 @@ func TestFileEvent(t *testing.T) {
assert.Equal(t, EventUpdate, ev.Event) assert.Equal(t, EventUpdate, ev.Event)
assert.Equal(t, "hello", ev.Value) assert.Equal(t, "hello", ev.Value)
} }
ioutil.WriteFile(path+"abc.toml", []byte(`test`), 0644)
select {
case <-timeout.C:
t.Fatalf("run test timeout")
case ev := <-ch:
assert.Equal(t, EventUpdate, ev.Event)
assert.Equal(t, "test", ev.Value)
}
content1, _ := cli.Get("test.toml").String()
assert.Equal(t, "hello", content1)
content2, _ := cli.Get("abc.toml").String()
assert.Equal(t, "test", content2)
}
func TestHiddenFile(t *testing.T) {
path := "/tmp/test_hidden_event/"
assert.Nil(t, os.MkdirAll(path, 0700))
assert.Nil(t, ioutil.WriteFile(path+"test.toml", []byte(`hello`), 0644))
assert.Nil(t, ioutil.WriteFile(path+".abc.toml", []byte(`
text = "hello"
number = 100
`), 0644))
// test client
// test client
cli, err := NewFile(path)
assert.Nil(t, err)
assert.NotNil(t, cli)
cli.WatchEvent(context.Background(), "test.toml")
time.Sleep(time.Millisecond)
ioutil.WriteFile(path+".abc.toml", []byte(`hello`), 0644)
time.Sleep(time.Second)
content1, _ := cli.Get("test.toml").String()
assert.Equal(t, "hello", content1)
_, err = cli.Get(".abc.toml").String()
assert.NotNil(t, err)
} }

Loading…
Cancel
Save