feat_health_check
haiyux 3 years ago
parent 2f74c9dfae
commit 49f700124b
  1. 2
      app.go
  2. 39
      health/health.go

@ -116,7 +116,7 @@ func (a *App) Run() error {
if a.opts.registrar != nil { if a.opts.registrar != nil {
rctx, rcancel := context.WithTimeout(a.opts.ctx, a.opts.registrarTimeout) rctx, rcancel := context.WithTimeout(a.opts.ctx, a.opts.registrarTimeout)
defer rcancel() defer rcancel()
if err := a.opts.registrar.Register(rctx, instance); err != nil { if err = a.opts.registrar.Register(rctx, instance); err != nil {
return err return err
} }
a.lk.Lock() a.lk.Lock()

@ -9,13 +9,11 @@ import (
) )
type Health struct { type Health struct {
mutex sync.RWMutex
// key: service name, type is string // key: service name, type is string
// value: service Status type is Status // value: service Status type is Status
statusMap sync.Map statusMap sync.Map
// key: service name, type is string updates map[string]map[string]chan Status
// value: update, type is sync.Map
// update: key is id, type is string, value: chan Status
updates sync.Map
ticker *time.Ticker ticker *time.Ticker
} }
@ -28,6 +26,7 @@ func New(opts ...Option) *Health {
} }
h := &Health{ h := &Health{
ticker: time.NewTicker(option.watchTime), ticker: time.NewTicker(option.watchTime),
updates: map[string]map[string]chan Status{},
} }
return h return h
} }
@ -45,11 +44,9 @@ func (h *Health) SetStatus(service string, status Status) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel() defer cancel()
eg, ctx := errgroup.WithContext(ctx) eg, ctx := errgroup.WithContext(ctx)
u, _ := h.updates.LoadOrStore(service, sync.Map{}) h.mutex.RLock()
update := u.(sync.Map) for _, w := range h.updates[service] {
ch := w
update.Range(func(key, value interface{}) bool {
ch := value.(chan Status)
eg.Go(func() error { eg.Go(func() error {
select { select {
case ch <- status: case ch <- status:
@ -58,23 +55,29 @@ func (h *Health) SetStatus(service string, status Status) error {
} }
return nil return nil
}) })
return true }
}) h.mutex.RUnlock()
return eg.Wait() return eg.Wait()
} }
func (h *Health) Update(service string, id string) chan Status { func (h *Health) Update(service string, id string) chan Status {
u, _ := h.updates.LoadOrStore(service, sync.Map{}) h.mutex.RLock()
update := u.(sync.Map) defer h.mutex.RUnlock()
ch, _ := update.LoadOrStore(id, make(chan Status, 1)) if _, ok := h.updates[service]; !ok {
return ch.(chan Status) h.updates[service] = make(map[string]chan Status)
}
if _, ok := h.updates[service][id]; !ok {
h.updates[service][id] = make(chan Status, 1)
}
return h.updates[service][id]
} }
func (h *Health) DelUpdate(service string, id string) { func (h *Health) DelUpdate(service string, id string) {
if u, ok := h.updates.Load(service); ok { h.mutex.Lock()
update := u.(sync.Map) defer h.mutex.Unlock()
update.Delete(id) if _, ok := h.updates[service]; ok {
delete(h.updates[service], id)
} }
} }

Loading…
Cancel
Save