You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
137 lines
2.8 KiB
137 lines
2.8 KiB
3 years ago
|
package eureka
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"strings"
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
type subscriber struct {
|
||
|
appID string
|
||
|
callBack func()
|
||
|
}
|
||
|
|
||
|
type API struct {
|
||
|
cli *Client
|
||
|
allInstances map[string][]Instance
|
||
|
subscribers map[string]*subscriber
|
||
|
refreshInterval time.Duration
|
||
|
lock sync.Mutex
|
||
|
}
|
||
|
|
||
|
func NewAPI(ctx context.Context, client *Client, refreshInterval time.Duration) *API {
|
||
|
e := &API{
|
||
|
cli: client,
|
||
|
allInstances: make(map[string][]Instance),
|
||
|
subscribers: make(map[string]*subscriber),
|
||
|
refreshInterval: refreshInterval,
|
||
|
}
|
||
|
|
||
|
// 首次广播一次
|
||
|
e.broadcast()
|
||
|
|
||
|
go e.refresh(ctx)
|
||
|
|
||
|
return e
|
||
|
}
|
||
|
|
||
|
func (e *API) refresh(ctx context.Context) {
|
||
|
ticker := time.NewTicker(e.refreshInterval)
|
||
|
defer ticker.Stop()
|
||
|
for {
|
||
|
select {
|
||
|
case <-ctx.Done():
|
||
|
return
|
||
|
case <-ticker.C:
|
||
|
e.broadcast()
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (e *API) broadcast() {
|
||
|
instances := e.cacheAllInstances()
|
||
|
if instances == nil {
|
||
|
return
|
||
|
}
|
||
|
|
||
|
for _, subscriber := range e.subscribers {
|
||
|
go subscriber.callBack()
|
||
|
}
|
||
|
defer e.lock.Unlock()
|
||
|
e.lock.Lock()
|
||
|
e.allInstances = instances
|
||
|
}
|
||
|
|
||
|
func (e *API) cacheAllInstances() map[string][]Instance {
|
||
|
items := make(map[string][]Instance)
|
||
|
instances := e.cli.FetchAllUpInstances(context.Background())
|
||
|
for _, instance := range instances {
|
||
|
items[e.ToAppID(instance.App)] = append(items[instance.App], instance)
|
||
|
}
|
||
|
|
||
|
return items
|
||
|
}
|
||
|
|
||
|
func (e *API) Register(ctx context.Context, serviceName string, endpoints ...Endpoint) error {
|
||
|
appID := e.ToAppID(serviceName)
|
||
|
upInstances := make(map[string]struct{})
|
||
|
|
||
|
for _, ins := range e.GetService(ctx, appID) {
|
||
|
upInstances[ins.InstanceID] = struct{}{}
|
||
|
}
|
||
|
|
||
|
for _, ep := range endpoints {
|
||
|
if _, ok := upInstances[ep.InstanceID]; !ok {
|
||
|
if err := e.cli.Register(ctx, ep); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
go e.cli.Heartbeat(ep)
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Deregister 中的ctx 和 register ctx 是同一个
|
||
|
func (e *API) Deregister(ctx context.Context, endpoints []Endpoint) error {
|
||
|
for _, ep := range endpoints {
|
||
|
if err := e.cli.Deregister(ctx, ep.AppID, ep.InstanceID); err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (e *API) Subscribe(serverName string, fn func()) error {
|
||
|
e.lock.Lock()
|
||
|
defer e.lock.Unlock()
|
||
|
appID := e.ToAppID(serverName)
|
||
|
e.subscribers[appID] = &subscriber{
|
||
|
appID: appID,
|
||
|
callBack: fn,
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func (e *API) GetService(ctx context.Context, serverName string) []Instance {
|
||
|
appID := e.ToAppID(serverName)
|
||
|
if ins, ok := e.allInstances[appID]; ok {
|
||
|
return ins
|
||
|
}
|
||
|
|
||
|
// 如果不再allinstances 中可以尝试再单独获取一次
|
||
|
return e.cli.FetchAppUpInstances(ctx, appID)
|
||
|
}
|
||
|
|
||
|
func (e *API) Unsubscribe(serverName string) {
|
||
|
e.lock.Lock()
|
||
|
defer e.lock.Unlock()
|
||
|
delete(e.subscribers, e.ToAppID(serverName))
|
||
|
}
|
||
|
|
||
|
func (e *API) ToAppID(serverName string) string {
|
||
|
return strings.ToUpper(serverName)
|
||
|
}
|