package consul import ( "context" "fmt" "sync" "sync/atomic" "time" "github.com/go-kratos/kratos/v2/registry" "github.com/hashicorp/consul/api" ) var ( _ registry.Registrar = &Registry{} _ registry.Discovery = &Registry{} ) // Option is consul registry option. type Option func(*Registry) // WithHealthCheck with registry health check option. func WithHealthCheck(enable bool) Option { return func(o *Registry) { o.enableHealthCheck = enable } } // WithHeartbeat enable or disable heartbeat func WithHeartbeat(enable bool) Option { return func(o *Registry) { if o.cli != nil { o.cli.heartbeat = enable } } } // WithServiceResolver with endpoint function option. func WithServiceResolver(fn ServiceResolver) Option { return func(o *Registry) { if o.cli != nil { o.cli.resolver = fn } } } // WithHealthCheckInterval with healthcheck interval in seconds. func WithHealthCheckInterval(interval int) Option { return func(o *Registry) { if o.cli != nil { o.cli.healthcheckInterval = interval } } } // WithDeregisterCriticalServiceAfter with deregister-critical-service-after in seconds. func WithDeregisterCriticalServiceAfter(interval int) Option { return func(o *Registry) { if o.cli != nil { o.cli.deregisterCriticalServiceAfter = interval } } } // Config is consul registry config type Config struct { *api.Config } // Registry is consul registry type Registry struct { cli *Client enableHealthCheck bool registry map[string]*serviceSet lock sync.RWMutex } // New creates consul registry func New(apiClient *api.Client, opts ...Option) *Registry { r := &Registry{ cli: NewClient(apiClient), registry: make(map[string]*serviceSet), enableHealthCheck: true, } for _, o := range opts { o(r) } return r } // Register register service func (r *Registry) Register(ctx context.Context, svc *registry.ServiceInstance) error { return r.cli.Register(ctx, svc, r.enableHealthCheck) } // Deregister deregister service func (r *Registry) Deregister(ctx context.Context, svc *registry.ServiceInstance) error { return r.cli.Deregister(ctx, svc.ID) } // GetService return service by name func (r *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) { r.lock.RLock() defer r.lock.RUnlock() set := r.registry[name] getRemote := func() []*registry.ServiceInstance { services, _, err := r.cli.Service(ctx, name, 0, true) if err == nil && len(services) > 0 { return services } return nil } if set == nil { if s := getRemote(); len(s) > 0 { return s, nil } return nil, fmt.Errorf("service %s not resolved in registry", name) } ss, _ := set.services.Load().([]*registry.ServiceInstance) if ss == nil { if s := getRemote(); len(s) > 0 { return s, nil } return nil, fmt.Errorf("service %s not found in registry", name) } return ss, nil } // ListServices return service list. func (r *Registry) ListServices() (allServices map[string][]*registry.ServiceInstance, err error) { r.lock.RLock() defer r.lock.RUnlock() allServices = make(map[string][]*registry.ServiceInstance) for name, set := range r.registry { var services []*registry.ServiceInstance ss, _ := set.services.Load().([]*registry.ServiceInstance) if ss == nil { continue } services = append(services, ss...) allServices[name] = services } return } // Watch resolve service by name func (r *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) { r.lock.Lock() defer r.lock.Unlock() set, ok := r.registry[name] if !ok { set = &serviceSet{ watcher: make(map[*watcher]struct{}), services: &atomic.Value{}, serviceName: name, } r.registry[name] = set } // 初始化watcher w := &watcher{ event: make(chan struct{}, 1), } w.ctx, w.cancel = context.WithCancel(context.Background()) w.set = set set.lock.Lock() set.watcher[w] = struct{}{} set.lock.Unlock() ss, _ := set.services.Load().([]*registry.ServiceInstance) if len(ss) > 0 { // If the service has a value, it needs to be pushed to the watcher, // otherwise the initial data may be blocked forever during the watch. w.event <- struct{}{} } if !ok { err := r.resolve(set) if err != nil { return nil, err } } return w, nil } func (r *Registry) resolve(ss *serviceSet) error { ctx, cancel := context.WithTimeout(context.Background(), time.Second*10) services, idx, err := r.cli.Service(ctx, ss.serviceName, 0, true) cancel() if err != nil { return err } else if len(services) > 0 { ss.broadcast(services) } go func() { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { <-ticker.C ctx, cancel := context.WithTimeout(context.Background(), time.Second*120) tmpService, tmpIdx, err := r.cli.Service(ctx, ss.serviceName, idx, true) cancel() if err != nil { time.Sleep(time.Second) continue } if len(tmpService) != 0 && tmpIdx != idx { services = tmpService ss.broadcast(services) } idx = tmpIdx } }() return nil }