diff --git a/health/checker.go b/health/checker.go index 0206d0c81..0fedb59db 100644 --- a/health/checker.go +++ b/health/checker.go @@ -10,7 +10,9 @@ type Checker interface { Check(ctx context.Context) (interface{}, error) } -type Watcher func(string) +type Watcher interface { + Watch(string) +} type checker struct { Name string @@ -18,25 +20,43 @@ type checker struct { timeout time.Duration Checker CheckerStatus - sync.RWMutex + *sync.RWMutex Watcher } -func NewChecker(name string, checker Checker) { +func NewChecker(name string, ch Checker, interval, timeout time.Duration) *checker { + return &checker{ + Name: name, + intervalTime: interval, + timeout: timeout, + Checker: ch, + CheckerStatus: CheckerStatus{}, + RWMutex: &sync.RWMutex{}, + Watcher: nil, + } +} +func (c *checker) setWatcher(w Watcher) { + c.Watcher = w } func (c *checker) check(ctx context.Context) bool { defer func() { recover() }() - ctx, cancel := context.WithTimeout(ctx, c.timeout) - defer cancel() + + var cancel func() + if c.timeout > 0 { + ctx, cancel = context.WithTimeout(ctx, c.timeout) + defer cancel() + } + detail, err := c.Check(ctx) status := StatusUp if err != nil { status = StatusDown } + c.Lock() defer c.Unlock() old := c.CheckerStatus @@ -59,8 +79,10 @@ func (c *checker) run(ctx context.Context) { default: } if c.check(ctx) { - //发送改变通知 - c.Watcher(c.Name) + //notify + if c.Watcher != nil { + c.Watcher.Watch(c.Name) + } } time.Sleep(c.intervalTime) } diff --git a/health/health.go b/health/health.go index 7484333a5..b80553aa7 100644 --- a/health/health.go +++ b/health/health.go @@ -1,35 +1,43 @@ package health import ( - "golang.org/x/net/context" + "context" + "sync" ) type CheckerMgr struct { - checkers map[string]checker - ctx context.Context - cancel func() - watchers []chan string + checkers map[string]*checker + ctx context.Context + cancel func() + watchers map[uint64]chan string + watcherID uint64 + + lock sync.RWMutex } func New(ctx context.Context) *CheckerMgr { c, cancel := context.WithCancel(ctx) return &CheckerMgr{ - checkers: make(map[string]checker), + checkers: make(map[string]*checker), ctx: c, cancel: cancel, + lock: sync.RWMutex{}, + watchers: map[uint64]chan string{}, } } func (c *CheckerMgr) Start() { for _, v := range c.checkers { + cv := v go func() { - v.run(c.ctx) + cv.run(c.ctx) }() } } func (c *CheckerMgr) Stop() { c.cancel() + c.closeAllWatcher() } type StatusResult struct { @@ -61,3 +69,65 @@ func (c *CheckerMgr) GetStatus(name ...string) []StatusResult { } return status } + +func (c *CheckerMgr) RegisterChecker(checker2 *checker) { + c.checkers[checker2.Name] = checker2 + checker2.setWatcher(c) +} + +func (c *CheckerMgr) Watch(name string) { + c.lock.RLock() + defer c.lock.RUnlock() + for _, ch := range c.watchers { + select { + case ch <- name: + default: + } + } +} + +type WatcherResult struct { + id uint64 + Ch <-chan string + c *CheckerMgr +} + +func (w *WatcherResult) Close() { + w.c.closeWatcher(w.id) +} + +func (c *CheckerMgr) NewWatcher() WatcherResult { + c.lock.Lock() + wID := c.watcherID + c.watcherID++ + ch := make(chan string, 1) + c.watchers[wID] = ch + c.lock.Unlock() + return WatcherResult{ + id: wID, + Ch: ch, + c: c, + } +} + +func (c *CheckerMgr) closeWatcher(wID uint64) { + c.lock.Lock() + defer c.lock.Unlock() + + ch, ok := c.watchers[wID] + if !ok { + return + } + close(ch) + delete(c.watchers, wID) +} + +func (c *CheckerMgr) closeAllWatcher() { + c.lock.Lock() + defer c.lock.Unlock() + + for k, v := range c.watchers { + close(v) + delete(c.watchers, k) + } +} diff --git a/health/health.proto b/health/health.proto new file mode 100644 index 000000000..7b4d5a4df --- /dev/null +++ b/health/health.proto @@ -0,0 +1,5 @@ +syntax="proto3"; + +package kratos.health; + +option go_package = "github.com/go-kratos/kratos/v2/api/proto/kratos/api;metadata"; \ No newline at end of file diff --git a/health/health_test.go b/health/health_test.go new file mode 100644 index 000000000..f900ad235 --- /dev/null +++ b/health/health_test.go @@ -0,0 +1,52 @@ +package health + +import ( + "fmt" + "golang.org/x/net/context" + "math/rand" + "testing" + "time" +) + +type A struct { +} + +func (A) Check(ctx context.Context) (interface{}, error) { + fmt.Println("check A") + if rand.Int()%2 == 0 { + return "出错A", fmt.Errorf("错误:%s", "123") + } + return "正常A", nil +} + +type B struct { +} + +func (B) Check(ctx context.Context) (interface{}, error) { + fmt.Println("check B") + if rand.Int()%2 == 0 { + return "出错B", fmt.Errorf("错误:%s", "123B") + } + return "正常B", nil +} + +func TestNew(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + cm := New(ctx) + cm.RegisterChecker(NewChecker("A", A{}, 0, time.Second*10)) + cm.RegisterChecker(NewChecker("B", B{}, 0, time.Second*10)) + cm.Start() + go func() { + + s := cm.GetStatus() + fmt.Println("----", s) + w := cm.NewWatcher() + defer w.Close() + for i := range w.Ch { + fmt.Println("---", cm.GetStatus(i)) + } + + }() + time.Sleep(time.Second * 100) + cancel() +}