diff --git a/contrib/registry/polaris/registry.go b/contrib/registry/polaris/registry.go index f1bb7a791..88ea02a6e 100644 --- a/contrib/registry/polaris/registry.go +++ b/contrib/registry/polaris/registry.go @@ -2,6 +2,7 @@ package polaris import ( "context" + "fmt" "net" "net/url" "strconv" @@ -11,11 +12,16 @@ import ( "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/registry" + "github.com/polarismesh/polaris-go/api" + "github.com/polarismesh/polaris-go/pkg/config" "github.com/polarismesh/polaris-go/pkg/model" ) -var _ registry.Registrar = (*Registry)(nil) +var ( + _ registry.Registrar = (*Registry)(nil) + _ registry.Discovery = (*Registry)(nil) +) // _instanceIDSeparator . Instance id Separator. const _instanceIDSeparator = "-" @@ -40,6 +46,9 @@ type options struct { // To show service is healthy or not. Default value is True . Healthy bool + // Heartbeat enable .Not in polaris . Default value is True. + Heartbeat bool + // To show service is isolate or not. Default value is False . Isolate bool @@ -61,6 +70,7 @@ type Option func(o *options) type Registry struct { opt options provider api.ProviderAPI + consumer api.ConsumerAPI } // WithNamespace with Namespace option. @@ -108,7 +118,12 @@ func WithRetryCount(retryCount int) Option { return func(o *options) { o.RetryCount = retryCount } } -func NewRegistry(provider api.ProviderAPI, opts ...Option) (r *Registry) { +// WithHeartbeat . with Heartbeat option. +func WithHeartbeat(heartbeat bool) Option { + return func(o *options) { o.Heartbeat = heartbeat } +} + +func NewRegistry(provider api.ProviderAPI, consumer api.ConsumerAPI, opts ...Option) (r *Registry) { op := options{ Namespace: "default", ServiceToken: "", @@ -116,6 +131,7 @@ func NewRegistry(provider api.ProviderAPI, opts ...Option) (r *Registry) { Weight: 0, Priority: 0, Healthy: true, + Heartbeat: true, Isolate: false, TTL: 0, Timeout: 0, @@ -127,9 +143,22 @@ func NewRegistry(provider api.ProviderAPI, opts ...Option) (r *Registry) { return &Registry{ opt: op, provider: provider, + consumer: consumer, } } +func NewRegistryWithConfig(conf config.Configuration, opts ...Option) (r *Registry) { + provider, err := api.NewProviderAPIByConfig(conf) + if err != nil { + panic(err) + } + consumer, err := api.NewConsumerAPIByConfig(conf) + if err != nil { + panic(err) + } + return NewRegistry(provider, consumer, opts...) +} + // Register the registration. func (r *Registry) Register(_ context.Context, serviceInstance *registry.ServiceInstance) error { ids := make([]string, 0, len(serviceInstance.Endpoints)) @@ -193,32 +222,34 @@ func (r *Registry) Register(_ context.Context, serviceInstance *registry.Service } instanceID := service.InstanceID - // start heartbeat report - go func() { - ticker := time.NewTicker(time.Second * time.Duration(r.opt.TTL)) - defer ticker.Stop() - - for { - <-ticker.C - - err = r.provider.Heartbeat(&api.InstanceHeartbeatRequest{ - InstanceHeartbeatRequest: model.InstanceHeartbeatRequest{ - Service: serviceInstance.Name + u.Scheme, - Namespace: r.opt.Namespace, - Host: host, - Port: portNum, - ServiceToken: r.opt.ServiceToken, - InstanceID: instanceID, - Timeout: &r.opt.Timeout, - RetryCount: &r.opt.RetryCount, - }, - }) - if err != nil { - log.Error(err.Error()) - continue + if r.opt.Heartbeat { + // start heartbeat report + go func() { + ticker := time.NewTicker(time.Second * time.Duration(r.opt.TTL)) + defer ticker.Stop() + + for { + <-ticker.C + + err = r.provider.Heartbeat(&api.InstanceHeartbeatRequest{ + InstanceHeartbeatRequest: model.InstanceHeartbeatRequest{ + Service: serviceInstance.Name + u.Scheme, + Namespace: r.opt.Namespace, + Host: host, + Port: portNum, + ServiceToken: r.opt.ServiceToken, + InstanceID: instanceID, + Timeout: &r.opt.Timeout, + RetryCount: &r.opt.RetryCount, + }, + }) + if err != nil { + log.Error(err.Error()) + continue + } } - } - }() + }() + } ids = append(ids, instanceID) } @@ -228,7 +259,7 @@ func (r *Registry) Register(_ context.Context, serviceInstance *registry.Service } // Deregister the registration. -func (r *Registry) Deregister(ctx context.Context, serviceInstance *registry.ServiceInstance) error { +func (r *Registry) Deregister(_ context.Context, serviceInstance *registry.ServiceInstance) error { split := strings.Split(serviceInstance.ID, _instanceIDSeparator) for i, endpoint := range serviceInstance.Endpoints { // get url @@ -269,3 +300,138 @@ func (r *Registry) Deregister(ctx context.Context, serviceInstance *registry.Ser } return nil } + +// GetService return the service instances in memory according to the service name. +func (r *Registry) GetService(_ context.Context, serviceName string) ([]*registry.ServiceInstance, error) { + // get all instances + instancesResponse, err := r.consumer.GetAllInstances(&api.GetAllInstancesRequest{ + GetAllInstancesRequest: model.GetAllInstancesRequest{ + Service: serviceName, + Namespace: r.opt.Namespace, + Timeout: &r.opt.Timeout, + RetryCount: &r.opt.RetryCount, + }, + }) + if err != nil { + return nil, err + } + + serviceInstances := instancesToServiceInstances(instancesResponse.GetInstances()) + + return serviceInstances, nil +} + +// Watch creates a watcher according to the service name. +func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) { + return newWatcher(ctx, r.opt.Namespace, serviceName, r.consumer) +} + +type Watcher struct { + ServiceName string + Namespace string + Ctx context.Context + Cancel context.CancelFunc + Channel <-chan model.SubScribeEvent + ServiceInstances []*registry.ServiceInstance +} + +func newWatcher(ctx context.Context, namespace string, serviceName string, consumer api.ConsumerAPI) (*Watcher, error) { + watchServiceResponse, err := consumer.WatchService(&api.WatchServiceRequest{ + WatchServiceRequest: model.WatchServiceRequest{ + Key: model.ServiceKey{ + Namespace: namespace, + Service: serviceName, + }, + }, + }) + if err != nil { + return nil, err + } + + w := &Watcher{ + Namespace: namespace, + ServiceName: serviceName, + Channel: watchServiceResponse.EventChannel, + ServiceInstances: instancesToServiceInstances(watchServiceResponse.GetAllInstancesResp.GetInstances()), + } + w.Ctx, w.Cancel = context.WithCancel(ctx) + return w, nil +} + +// Next returns services in the following two cases: +// 1.the first time to watch and the service instance list is not empty. +// 2.any service instance changes found. +// if the above two conditions are not met, it will block until context deadline exceeded or canceled +func (w *Watcher) Next() ([]*registry.ServiceInstance, error) { + select { + case <-w.Ctx.Done(): + return nil, w.Ctx.Err() + case event := <-w.Channel: + if event.GetSubScribeEventType() == model.EventInstance { + // this always true, but we need to check it to make sure EventType not change + if instanceEvent, ok := event.(*model.InstanceEvent); ok { + // handle DeleteEvent + if instanceEvent.DeleteEvent != nil { + for _, instance := range instanceEvent.DeleteEvent.Instances { + for i, serviceInstance := range w.ServiceInstances { + if serviceInstance.ID == instance.GetId() { + // remove equal + if len(w.ServiceInstances) <= 1 { + w.ServiceInstances = w.ServiceInstances[0:0] + continue + } + w.ServiceInstances = append(w.ServiceInstances[:i], w.ServiceInstances[i+1:]...) + } + } + } + } + // handle UpdateEvent + if instanceEvent.UpdateEvent != nil { + for i, serviceInstance := range w.ServiceInstances { + for _, update := range instanceEvent.UpdateEvent.UpdateList { + if serviceInstance.ID == update.Before.GetId() { + w.ServiceInstances[i] = instanceToServiceInstance(update.After) + } + } + } + } + // handle AddEvent + if instanceEvent.AddEvent != nil { + w.ServiceInstances = append(w.ServiceInstances, instancesToServiceInstances(instanceEvent.AddEvent.Instances)...) + } + } + return w.ServiceInstances, nil + } + } + return w.ServiceInstances, nil +} + +// Stop close the watcher. +func (w *Watcher) Stop() error { + w.Cancel() + return nil +} + +func instancesToServiceInstances(instances []model.Instance) []*registry.ServiceInstance { + serviceInstances := make([]*registry.ServiceInstance, 0, len(instances)) + for _, instance := range instances { + serviceInstances = append(serviceInstances, instanceToServiceInstance(instance)) + } + return serviceInstances +} + +func instanceToServiceInstance(instance model.Instance) *registry.ServiceInstance { + metadata := instance.GetMetadata() + // Usually, it won't fail in kratos if register correctly + kind := "" + if k, ok := metadata["kind"]; ok { + kind = k + } + return ®istry.ServiceInstance{ + ID: instance.GetId(), + Name: instance.GetService(), + Version: metadata["version"], + Metadata: metadata, + Endpoints: []string{fmt.Sprintf("%s://%s:%d", kind, instance.GetHost(), instance.GetPort())}, + } +} diff --git a/contrib/registry/polaris/registry_test.go b/contrib/registry/polaris/registry_test.go index af64230cb..c6cc49f22 100644 --- a/contrib/registry/polaris/registry_test.go +++ b/contrib/registry/polaris/registry_test.go @@ -5,25 +5,21 @@ import ( "testing" "time" + "github.com/go-kratos/kratos/v2/log" + "github.com/polarismesh/polaris-go/pkg/config" "github.com/go-kratos/kratos/v2/registry" - "github.com/polarismesh/polaris-go/api" ) // TestRegistry . TestRegistryManyService func TestRegistry(t *testing.T) { conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"}) - provider, err := api.NewProviderAPIByConfig(conf) - defer provider.Destroy() - if err != nil { - t.Fatal(err) - } - r := NewRegistry( - provider, - WithTimeout(1*time.Second), - WithTTL(5), + r := NewRegistryWithConfig( + conf, + WithTimeout(time.Second*10), + WithTTL(100), ) ctx := context.Background() @@ -35,7 +31,7 @@ func TestRegistry(t *testing.T) { Endpoints: []string{"tcp://127.0.0.1:9000?isSecure=false"}, } - err = r.Register(ctx, svc) + err := r.Register(ctx, svc) if err != nil { t.Fatal(err) } @@ -49,38 +45,33 @@ func TestRegistry(t *testing.T) { // TestRegistryMany . TestRegistryManyService func TestRegistryMany(t *testing.T) { conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"}) - provider, err := api.NewProviderAPIByConfig(conf) - defer provider.Destroy() - if err != nil { - t.Fatal(err) - } - r := NewRegistry( - provider, - WithTimeout(1*time.Second), - WithTTL(10), + r := NewRegistryWithConfig( + conf, + WithTimeout(time.Second*10), + WithTTL(100), ) svc := ®istry.ServiceInstance{ - Name: "kratos-provider-0-", + Name: "kratos-provider-1-", Version: "test", Metadata: map[string]string{"app": "kratos"}, Endpoints: []string{"tcp://127.0.0.1:9000?isSecure=false"}, } svc1 := ®istry.ServiceInstance{ - Name: "kratos-provider-1-", + Name: "kratos-provider-2-", Version: "test", Metadata: map[string]string{"app": "kratos"}, Endpoints: []string{"tcp://127.0.0.1:9001?isSecure=false"}, } svc2 := ®istry.ServiceInstance{ - Name: "kratos-provider-2-", + Name: "kratos-provider-3-", Version: "test", Metadata: map[string]string{"app": "kratos"}, Endpoints: []string{"tcp://127.0.0.1:9002?isSecure=false"}, } - err = r.Register(context.Background(), svc) + err := r.Register(context.Background(), svc) if err != nil { t.Fatal(err) } @@ -110,3 +101,106 @@ func TestRegistryMany(t *testing.T) { t.Fatal(err) } } + +// TestGetService . TestGetService +func TestGetService(t *testing.T) { + conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"}) + + r := NewRegistryWithConfig( + conf, + WithTimeout(time.Second*10), + WithTTL(100), + ) + + ctx := context.Background() + + svc := ®istry.ServiceInstance{ + Name: "kratos-provider-4-", + Version: "test", + Metadata: map[string]string{"app": "kratos"}, + Endpoints: []string{"tcp://127.0.0.1:9000?isSecure=false"}, + } + + err := r.Register(ctx, svc) + if err != nil { + t.Fatal(err) + } + time.Sleep(time.Second * 1) + serviceInstances, err := r.GetService(ctx, "kratos-provider-4-tcp") + if err != nil { + t.Fatal(err) + } + for _, instance := range serviceInstances { + log.Info(instance) + } + + err = r.Deregister(ctx, svc) + if err != nil { + t.Fatal(err) + } +} + +// TestWatch . TestWatch +func TestWatch(t *testing.T) { + conf := config.NewDefaultConfiguration([]string{"127.0.0.1:8091"}) + + r := NewRegistryWithConfig( + conf, + WithTimeout(time.Second*10), + WithTTL(100), + ) + + svc := ®istry.ServiceInstance{ + Name: "kratos-provider-4-", + Version: "test", + Metadata: map[string]string{"app": "kratos"}, + Endpoints: []string{"tcp://127.0.0.1:9000?isSecure=false"}, + } + + watch, err := r.Watch(context.Background(), "kratos-provider-4-tcp") + if err != nil { + t.Fatal(err) + } + + err = r.Register(context.Background(), svc) + if err != nil { + t.Fatal(err) + } + // watch svc + time.Sleep(time.Second * 1) + + // svc register, AddEvent + next, err := watch.Next() + if err != nil { + t.Fatal(err) + } + for _, instance := range next { + // it will output one instance + log.Info(instance) + } + + err = r.Deregister(context.Background(), svc) + if err != nil { + t.Fatal(err) + } + + // svc deregister, DeleteEvent + next, err = watch.Next() + if err != nil { + t.Fatal(err) + } + for _, instance := range next { + // it will output nothing + log.Info(instance) + } + + err = watch.Stop() + if err != nil { + t.Fatal(err) + } + _, err = watch.Next() + if err == nil { + // if nil, stop failed + t.Fatal() + } +}