pull/2481/head
FengbinShi 2 years ago
parent 42a18320b1
commit 639d47e89f
  1. 36
      health/checker.go
  2. 84
      health/health.go
  3. 5
      health/health.proto
  4. 52
      health/health_test.go

@ -10,7 +10,9 @@ type Checker interface {
Check(ctx context.Context) (interface{}, error) Check(ctx context.Context) (interface{}, error)
} }
type Watcher func(string) type Watcher interface {
Watch(string)
}
type checker struct { type checker struct {
Name string Name string
@ -18,25 +20,43 @@ type checker struct {
timeout time.Duration timeout time.Duration
Checker Checker
CheckerStatus CheckerStatus
sync.RWMutex *sync.RWMutex
Watcher Watcher
} }
func NewChecker(name string, checker Checker) { func NewChecker(name string, ch Checker, interval, timeout time.Duration) *checker {
return &checker{
Name: name,
intervalTime: interval,
timeout: timeout,
Checker: ch,
CheckerStatus: CheckerStatus{},
RWMutex: &sync.RWMutex{},
Watcher: nil,
}
}
func (c *checker) setWatcher(w Watcher) {
c.Watcher = w
} }
func (c *checker) check(ctx context.Context) bool { func (c *checker) check(ctx context.Context) bool {
defer func() { defer func() {
recover() recover()
}() }()
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel() var cancel func()
if c.timeout > 0 {
ctx, cancel = context.WithTimeout(ctx, c.timeout)
defer cancel()
}
detail, err := c.Check(ctx) detail, err := c.Check(ctx)
status := StatusUp status := StatusUp
if err != nil { if err != nil {
status = StatusDown status = StatusDown
} }
c.Lock() c.Lock()
defer c.Unlock() defer c.Unlock()
old := c.CheckerStatus old := c.CheckerStatus
@ -59,8 +79,10 @@ func (c *checker) run(ctx context.Context) {
default: default:
} }
if c.check(ctx) { if c.check(ctx) {
//发送改变通知 //notify
c.Watcher(c.Name) if c.Watcher != nil {
c.Watcher.Watch(c.Name)
}
} }
time.Sleep(c.intervalTime) time.Sleep(c.intervalTime)
} }

@ -1,35 +1,43 @@
package health package health
import ( import (
"golang.org/x/net/context" "context"
"sync"
) )
type CheckerMgr struct { type CheckerMgr struct {
checkers map[string]checker checkers map[string]*checker
ctx context.Context ctx context.Context
cancel func() cancel func()
watchers []chan string watchers map[uint64]chan string
watcherID uint64
lock sync.RWMutex
} }
func New(ctx context.Context) *CheckerMgr { func New(ctx context.Context) *CheckerMgr {
c, cancel := context.WithCancel(ctx) c, cancel := context.WithCancel(ctx)
return &CheckerMgr{ return &CheckerMgr{
checkers: make(map[string]checker), checkers: make(map[string]*checker),
ctx: c, ctx: c,
cancel: cancel, cancel: cancel,
lock: sync.RWMutex{},
watchers: map[uint64]chan string{},
} }
} }
func (c *CheckerMgr) Start() { func (c *CheckerMgr) Start() {
for _, v := range c.checkers { for _, v := range c.checkers {
cv := v
go func() { go func() {
v.run(c.ctx) cv.run(c.ctx)
}() }()
} }
} }
func (c *CheckerMgr) Stop() { func (c *CheckerMgr) Stop() {
c.cancel() c.cancel()
c.closeAllWatcher()
} }
type StatusResult struct { type StatusResult struct {
@ -61,3 +69,65 @@ func (c *CheckerMgr) GetStatus(name ...string) []StatusResult {
} }
return status return status
} }
func (c *CheckerMgr) RegisterChecker(checker2 *checker) {
c.checkers[checker2.Name] = checker2
checker2.setWatcher(c)
}
func (c *CheckerMgr) Watch(name string) {
c.lock.RLock()
defer c.lock.RUnlock()
for _, ch := range c.watchers {
select {
case ch <- name:
default:
}
}
}
type WatcherResult struct {
id uint64
Ch <-chan string
c *CheckerMgr
}
func (w *WatcherResult) Close() {
w.c.closeWatcher(w.id)
}
func (c *CheckerMgr) NewWatcher() WatcherResult {
c.lock.Lock()
wID := c.watcherID
c.watcherID++
ch := make(chan string, 1)
c.watchers[wID] = ch
c.lock.Unlock()
return WatcherResult{
id: wID,
Ch: ch,
c: c,
}
}
func (c *CheckerMgr) closeWatcher(wID uint64) {
c.lock.Lock()
defer c.lock.Unlock()
ch, ok := c.watchers[wID]
if !ok {
return
}
close(ch)
delete(c.watchers, wID)
}
func (c *CheckerMgr) closeAllWatcher() {
c.lock.Lock()
defer c.lock.Unlock()
for k, v := range c.watchers {
close(v)
delete(c.watchers, k)
}
}

@ -0,0 +1,5 @@
syntax="proto3";
package kratos.health;
option go_package = "github.com/go-kratos/kratos/v2/api/proto/kratos/api;metadata";

@ -0,0 +1,52 @@
package health
import (
"fmt"
"golang.org/x/net/context"
"math/rand"
"testing"
"time"
)
type A struct {
}
func (A) Check(ctx context.Context) (interface{}, error) {
fmt.Println("check A")
if rand.Int()%2 == 0 {
return "出错A", fmt.Errorf("错误:%s", "123")
}
return "正常A", nil
}
type B struct {
}
func (B) Check(ctx context.Context) (interface{}, error) {
fmt.Println("check B")
if rand.Int()%2 == 0 {
return "出错B", fmt.Errorf("错误:%s", "123B")
}
return "正常B", nil
}
func TestNew(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
cm := New(ctx)
cm.RegisterChecker(NewChecker("A", A{}, 0, time.Second*10))
cm.RegisterChecker(NewChecker("B", B{}, 0, time.Second*10))
cm.Start()
go func() {
s := cm.GetStatus()
fmt.Println("----", s)
w := cm.NewWatcher()
defer w.Close()
for i := range w.Ch {
fmt.Println("---", cm.GetStatus(i))
}
}()
time.Sleep(time.Second * 100)
cancel()
}
Loading…
Cancel
Save