package polaris import ( "context" "fmt" "net" "net/url" "strconv" "time" "github.com/google/uuid" "github.com/polarismesh/polaris-go" "github.com/polarismesh/polaris-go/pkg/model" "github.com/go-kratos/kratos/v2/registry" ) var ( _ registry.Registrar = (*Registry)(nil) _ registry.Discovery = (*Registry)(nil) ) type registryOptions struct { // required, testNamespace in polaris Namespace string // required, service access token ServiceToken string // service weight in polaris. Default value is 100, 0 <= weight <= 10000 Weight int // service priority. Default value is 0. The smaller the value, the lower the priority Priority int // To show service is healthy or not. Default value is True . Healthy bool // To show service is isolate or not. Default value is False . Isolate bool // TTL timeout. if node needs to use heartbeat to report,required. If not set,server will throw ErrorCode-400141 TTL int // optional, Timeout for single query. Default value is global config // Total is (1+RetryCount) * Timeout Timeout time.Duration // optional, retry count. Default value is global config RetryCount int } // RegistryOption is polaris option. type RegistryOption func(o *registryOptions) // Registry is polaris registry. type Registry struct { opt registryOptions provider polaris.ProviderAPI consumer polaris.ConsumerAPI } // WithRegistryServiceToken with ServiceToken option. func WithRegistryServiceToken(serviceToken string) RegistryOption { return func(o *registryOptions) { o.ServiceToken = serviceToken } } // WithRegistryWeight with Weight option. func WithRegistryWeight(weight int) RegistryOption { return func(o *registryOptions) { o.Weight = weight } } // WithRegistryHealthy with Healthy option. func WithRegistryHealthy(healthy bool) RegistryOption { return func(o *registryOptions) { o.Healthy = healthy } } // WithRegistryIsolate with Isolate option. func WithRegistryIsolate(isolate bool) RegistryOption { return func(o *registryOptions) { o.Isolate = isolate } } // WithRegistryTTL with TTL option. func WithRegistryTTL(TTL int) RegistryOption { return func(o *registryOptions) { o.TTL = TTL } } // WithRegistryTimeout with Timeout option. func WithRegistryTimeout(timeout time.Duration) RegistryOption { return func(o *registryOptions) { o.Timeout = timeout } } // WithRegistryRetryCount with RetryCount option. func WithRegistryRetryCount(retryCount int) RegistryOption { return func(o *registryOptions) { o.RetryCount = retryCount } } // Register the registration. func (r *Registry) Register(_ context.Context, instance *registry.ServiceInstance) error { id := uuid.NewString() for _, endpoint := range instance.Endpoints { u, err := url.Parse(endpoint) if err != nil { return err } host, port, err := net.SplitHostPort(u.Host) if err != nil { return err } portNum, err := strconv.Atoi(port) if err != nil { return err } // metadata if instance.Metadata == nil { instance.Metadata = make(map[string]string) } instance.Metadata["merge"] = id if _, ok := instance.Metadata["weight"]; !ok { instance.Metadata["weight"] = strconv.Itoa(r.opt.Weight) } weight, _ := strconv.Atoi(instance.Metadata["weight"]) _, err = r.provider.RegisterInstance( &polaris.InstanceRegisterRequest{ InstanceRegisterRequest: model.InstanceRegisterRequest{ Service: instance.Name, ServiceToken: r.opt.ServiceToken, Namespace: r.opt.Namespace, Host: host, Port: portNum, Protocol: &u.Scheme, Weight: &weight, Priority: &r.opt.Priority, Version: &instance.Version, Metadata: instance.Metadata, Healthy: &r.opt.Healthy, Isolate: &r.opt.Isolate, TTL: &r.opt.TTL, Timeout: &r.opt.Timeout, RetryCount: &r.opt.RetryCount, }, }, ) if err != nil { return err } } return nil } // Deregister the registration. func (r *Registry) Deregister(_ context.Context, serviceInstance *registry.ServiceInstance) error { for _, endpoint := range serviceInstance.Endpoints { // get url u, err := url.Parse(endpoint) if err != nil { return err } // get host and port host, port, err := net.SplitHostPort(u.Host) if err != nil { return err } // port to int portNum, err := strconv.Atoi(port) if err != nil { return err } // Deregister err = r.provider.Deregister( &polaris.InstanceDeRegisterRequest{ InstanceDeRegisterRequest: model.InstanceDeRegisterRequest{ Service: serviceInstance.Name, ServiceToken: r.opt.ServiceToken, Namespace: r.opt.Namespace, Host: host, Port: portNum, Timeout: &r.opt.Timeout, RetryCount: &r.opt.RetryCount, }, }, ) if err != nil { return err } } return nil } // GetService return the service instances in memory according to the service name. func (r *Registry) GetService(_ context.Context, serviceName string) ([]*registry.ServiceInstance, error) { // get all instances instancesResponse, err := r.consumer.GetInstances(&polaris.GetInstancesRequest{ GetInstancesRequest: model.GetInstancesRequest{ Service: serviceName, Namespace: r.opt.Namespace, Timeout: &r.opt.Timeout, RetryCount: &r.opt.RetryCount, SkipRouteFilter: true, }, }) if err != nil { return nil, err } serviceInstances := instancesToServiceInstances(merge(instancesResponse.GetInstances())) return serviceInstances, nil } func merge(instances []model.Instance) map[string][]model.Instance { m := make(map[string][]model.Instance) for _, instance := range instances { if v, ok := m[instance.GetMetadata()["merge"]]; ok { m[instance.GetMetadata()["merge"]] = append(v, instance) } else { m[instance.GetMetadata()["merge"]] = []model.Instance{instance} } } return m } // Watch creates a watcher according to the service name. func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) { return newWatcher(ctx, r.opt.Namespace, serviceName, r.consumer) } type Watcher struct { ServiceName string Namespace string Ctx context.Context Cancel context.CancelFunc Channel <-chan model.SubScribeEvent service *model.InstancesResponse ServiceInstances map[string][]model.Instance first bool } func newWatcher(ctx context.Context, namespace string, serviceName string, consumer polaris.ConsumerAPI) (*Watcher, error) { watchServiceResponse, err := consumer.WatchService(&polaris.WatchServiceRequest{ WatchServiceRequest: model.WatchServiceRequest{ Key: model.ServiceKey{ Namespace: namespace, Service: serviceName, }, }, }) if err != nil { return nil, err } w := &Watcher{ Namespace: namespace, ServiceName: serviceName, Channel: watchServiceResponse.EventChannel, service: watchServiceResponse.GetAllInstancesResp, ServiceInstances: merge(watchServiceResponse.GetAllInstancesResp.GetInstances()), } w.Ctx, w.Cancel = context.WithCancel(ctx) return w, nil } // Next returns services in the following two cases: // 1.the first time to watch and the service instance list is not empty. // 2.any service instance changes found. // if the above two conditions are not met, it will block until context deadline exceeded or canceled func (w *Watcher) Next() ([]*registry.ServiceInstance, error) { if !w.first { w.first = true if len(w.ServiceInstances) > 0 { return instancesToServiceInstances(w.ServiceInstances), nil } } select { case <-w.Ctx.Done(): return nil, w.Ctx.Err() case event := <-w.Channel: if event.GetSubScribeEventType() == model.EventInstance { // this always true, but we need to check it to make sure EventType not change if instanceEvent, ok := event.(*model.InstanceEvent); ok { // handle DeleteEvent if instanceEvent.DeleteEvent != nil { for _, instance := range instanceEvent.DeleteEvent.Instances { delete(w.ServiceInstances, instance.GetMetadata()["merge"]) } } // handle UpdateEvent if instanceEvent.UpdateEvent != nil { for _, update := range instanceEvent.UpdateEvent.UpdateList { if v, ok := w.ServiceInstances[update.After.GetMetadata()["merge"]]; ok { var nv []model.Instance m := map[string]model.Instance{} for _, ins := range v { m[ins.GetId()] = ins } m[update.After.GetId()] = update.After for _, ins := range m { if ins.IsHealthy() { nv = append(nv, ins) } } w.ServiceInstances[update.After.GetMetadata()["merge"]] = nv if len(nv) == 0 { delete(w.ServiceInstances, update.After.GetMetadata()["merge"]) } } else { if update.After.IsHealthy() { w.ServiceInstances[update.After.GetMetadata()["merge"]] = []model.Instance{update.After} } } } } // handle AddEvent if instanceEvent.AddEvent != nil { for _, instance := range instanceEvent.AddEvent.Instances { if v, ok := w.ServiceInstances[instance.GetMetadata()["merge"]]; ok { var nv []model.Instance m := map[string]model.Instance{} for _, ins := range v { m[ins.GetId()] = ins } m[instance.GetId()] = instance for _, ins := range m { if ins.IsHealthy() { nv = append(nv, ins) } } if len(nv) != 0 { w.ServiceInstances[instance.GetMetadata()["merge"]] = nv } } else { if instance.IsHealthy() { w.ServiceInstances[instance.GetMetadata()["merge"]] = []model.Instance{instance} } } } } } return instancesToServiceInstances(w.ServiceInstances), nil } } return instancesToServiceInstances(w.ServiceInstances), nil } // Stop close the watcher. func (w *Watcher) Stop() error { w.Cancel() return nil } func instancesToServiceInstances(instances map[string][]model.Instance) []*registry.ServiceInstance { serviceInstances := make([]*registry.ServiceInstance, 0, len(instances)) for _, inss := range instances { if len(inss) == 0 { continue } ins := ®istry.ServiceInstance{ ID: inss[0].GetId(), Name: inss[0].GetService(), Version: inss[0].GetVersion(), Metadata: inss[0].GetMetadata(), } for _, item := range inss { if item.IsHealthy() { ins.Endpoints = append(ins.Endpoints, fmt.Sprintf("%s://%s:%d", item.GetProtocol(), item.GetHost(), item.GetPort())) } } if len(ins.Endpoints) != 0 { serviceInstances = append(serviceInstances, ins) } } return serviceInstances }