You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
381 lines
10 KiB
381 lines
10 KiB
2 years ago
|
package polaris
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"fmt"
|
||
|
"net"
|
||
|
"net/url"
|
||
|
"strconv"
|
||
|
"time"
|
||
|
|
||
|
"github.com/google/uuid"
|
||
|
"github.com/polarismesh/polaris-go"
|
||
|
"github.com/polarismesh/polaris-go/pkg/model"
|
||
|
|
||
|
"github.com/go-kratos/kratos/v2/registry"
|
||
|
)
|
||
|
|
||
|
var (
|
||
|
_ registry.Registrar = (*Registry)(nil)
|
||
|
_ registry.Discovery = (*Registry)(nil)
|
||
|
)
|
||
|
|
||
|
type registryOptions struct {
|
||
|
// required, testNamespace in polaris
|
||
|
Namespace string
|
||
|
|
||
|
// required, service access token
|
||
|
ServiceToken string
|
||
|
|
||
|
// service weight in polaris. Default value is 100, 0 <= weight <= 10000
|
||
|
Weight int
|
||
|
|
||
|
// service priority. Default value is 0. The smaller the value, the lower the priority
|
||
|
Priority int
|
||
|
|
||
|
// To show service is healthy or not. Default value is True .
|
||
|
Healthy bool
|
||
|
|
||
|
// To show service is isolate or not. Default value is False .
|
||
|
Isolate bool
|
||
|
|
||
|
// TTL timeout. if node needs to use heartbeat to report,required. If not set,server will throw ErrorCode-400141
|
||
|
TTL int
|
||
|
|
||
|
// optional, Timeout for single query. Default value is global config
|
||
|
// Total is (1+RetryCount) * Timeout
|
||
|
Timeout time.Duration
|
||
|
|
||
|
// optional, retry count. Default value is global config
|
||
|
RetryCount int
|
||
|
}
|
||
|
|
||
|
// RegistryOption is polaris option.
|
||
|
type RegistryOption func(o *registryOptions)
|
||
|
|
||
|
// Registry is polaris registry.
|
||
|
type Registry struct {
|
||
|
opt registryOptions
|
||
|
provider polaris.ProviderAPI
|
||
|
consumer polaris.ConsumerAPI
|
||
|
}
|
||
|
|
||
|
// WithRegistryServiceToken with ServiceToken option.
|
||
|
func WithRegistryServiceToken(serviceToken string) RegistryOption {
|
||
|
return func(o *registryOptions) { o.ServiceToken = serviceToken }
|
||
|
}
|
||
|
|
||
|
// WithRegistryWeight with Weight option.
|
||
|
func WithRegistryWeight(weight int) RegistryOption {
|
||
|
return func(o *registryOptions) { o.Weight = weight }
|
||
|
}
|
||
|
|
||
|
// WithRegistryHealthy with Healthy option.
|
||
|
func WithRegistryHealthy(healthy bool) RegistryOption {
|
||
|
return func(o *registryOptions) { o.Healthy = healthy }
|
||
|
}
|
||
|
|
||
|
// WithRegistryIsolate with Isolate option.
|
||
|
func WithRegistryIsolate(isolate bool) RegistryOption {
|
||
|
return func(o *registryOptions) { o.Isolate = isolate }
|
||
|
}
|
||
|
|
||
|
// WithRegistryTTL with TTL option.
|
||
|
func WithRegistryTTL(TTL int) RegistryOption {
|
||
|
return func(o *registryOptions) { o.TTL = TTL }
|
||
|
}
|
||
|
|
||
|
// WithRegistryTimeout with Timeout option.
|
||
|
func WithRegistryTimeout(timeout time.Duration) RegistryOption {
|
||
|
return func(o *registryOptions) { o.Timeout = timeout }
|
||
|
}
|
||
|
|
||
|
// WithRegistryRetryCount with RetryCount option.
|
||
|
func WithRegistryRetryCount(retryCount int) RegistryOption {
|
||
|
return func(o *registryOptions) { o.RetryCount = retryCount }
|
||
|
}
|
||
|
|
||
|
// Register the registration.
|
||
|
func (r *Registry) Register(_ context.Context, instance *registry.ServiceInstance) error {
|
||
|
id := uuid.NewString()
|
||
|
for _, endpoint := range instance.Endpoints {
|
||
|
u, err := url.Parse(endpoint)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
host, port, err := net.SplitHostPort(u.Host)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
portNum, err := strconv.Atoi(port)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// metadata
|
||
|
if instance.Metadata == nil {
|
||
|
instance.Metadata = make(map[string]string)
|
||
|
}
|
||
|
instance.Metadata["merge"] = id
|
||
|
if _, ok := instance.Metadata["weight"]; !ok {
|
||
|
instance.Metadata["weight"] = strconv.Itoa(r.opt.Weight)
|
||
|
}
|
||
|
|
||
|
weight, _ := strconv.Atoi(instance.Metadata["weight"])
|
||
|
|
||
|
_, err = r.provider.RegisterInstance(
|
||
|
&polaris.InstanceRegisterRequest{
|
||
|
InstanceRegisterRequest: model.InstanceRegisterRequest{
|
||
|
Service: instance.Name,
|
||
|
ServiceToken: r.opt.ServiceToken,
|
||
|
Namespace: r.opt.Namespace,
|
||
|
Host: host,
|
||
|
Port: portNum,
|
||
|
Protocol: &u.Scheme,
|
||
|
Weight: &weight,
|
||
|
Priority: &r.opt.Priority,
|
||
|
Version: &instance.Version,
|
||
|
Metadata: instance.Metadata,
|
||
|
Healthy: &r.opt.Healthy,
|
||
|
Isolate: &r.opt.Isolate,
|
||
|
TTL: &r.opt.TTL,
|
||
|
Timeout: &r.opt.Timeout,
|
||
|
RetryCount: &r.opt.RetryCount,
|
||
|
},
|
||
|
},
|
||
|
)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// Deregister the registration.
|
||
|
func (r *Registry) Deregister(_ context.Context, serviceInstance *registry.ServiceInstance) error {
|
||
|
for _, endpoint := range serviceInstance.Endpoints {
|
||
|
// get url
|
||
|
u, err := url.Parse(endpoint)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// get host and port
|
||
|
host, port, err := net.SplitHostPort(u.Host)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
|
||
|
// port to int
|
||
|
portNum, err := strconv.Atoi(port)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
// Deregister
|
||
|
err = r.provider.Deregister(
|
||
|
&polaris.InstanceDeRegisterRequest{
|
||
|
InstanceDeRegisterRequest: model.InstanceDeRegisterRequest{
|
||
|
Service: serviceInstance.Name,
|
||
|
ServiceToken: r.opt.ServiceToken,
|
||
|
Namespace: r.opt.Namespace,
|
||
|
Host: host,
|
||
|
Port: portNum,
|
||
|
Timeout: &r.opt.Timeout,
|
||
|
RetryCount: &r.opt.RetryCount,
|
||
|
},
|
||
|
},
|
||
|
)
|
||
|
if err != nil {
|
||
|
return err
|
||
|
}
|
||
|
}
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
// GetService return the service instances in memory according to the service name.
|
||
|
func (r *Registry) GetService(_ context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
|
||
|
// get all instances
|
||
|
instancesResponse, err := r.consumer.GetInstances(&polaris.GetInstancesRequest{
|
||
|
GetInstancesRequest: model.GetInstancesRequest{
|
||
|
Service: serviceName,
|
||
|
Namespace: r.opt.Namespace,
|
||
|
Timeout: &r.opt.Timeout,
|
||
|
RetryCount: &r.opt.RetryCount,
|
||
|
SkipRouteFilter: true,
|
||
|
},
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
serviceInstances := instancesToServiceInstances(merge(instancesResponse.GetInstances()))
|
||
|
|
||
|
return serviceInstances, nil
|
||
|
}
|
||
|
|
||
|
func merge(instances []model.Instance) map[string][]model.Instance {
|
||
|
m := make(map[string][]model.Instance)
|
||
|
for _, instance := range instances {
|
||
|
if v, ok := m[instance.GetMetadata()["merge"]]; ok {
|
||
|
m[instance.GetMetadata()["merge"]] = append(v, instance)
|
||
|
} else {
|
||
|
m[instance.GetMetadata()["merge"]] = []model.Instance{instance}
|
||
|
}
|
||
|
}
|
||
|
return m
|
||
|
}
|
||
|
|
||
|
// Watch creates a watcher according to the service name.
|
||
|
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
|
||
|
return newWatcher(ctx, r.opt.Namespace, serviceName, r.consumer)
|
||
|
}
|
||
|
|
||
|
type Watcher struct {
|
||
|
ServiceName string
|
||
|
Namespace string
|
||
|
Ctx context.Context
|
||
|
Cancel context.CancelFunc
|
||
|
Channel <-chan model.SubScribeEvent
|
||
|
service *model.InstancesResponse
|
||
|
ServiceInstances map[string][]model.Instance
|
||
|
first bool
|
||
|
}
|
||
|
|
||
|
func newWatcher(ctx context.Context, namespace string, serviceName string, consumer polaris.ConsumerAPI) (*Watcher, error) {
|
||
|
watchServiceResponse, err := consumer.WatchService(&polaris.WatchServiceRequest{
|
||
|
WatchServiceRequest: model.WatchServiceRequest{
|
||
|
Key: model.ServiceKey{
|
||
|
Namespace: namespace,
|
||
|
Service: serviceName,
|
||
|
},
|
||
|
},
|
||
|
})
|
||
|
if err != nil {
|
||
|
return nil, err
|
||
|
}
|
||
|
|
||
|
w := &Watcher{
|
||
|
Namespace: namespace,
|
||
|
ServiceName: serviceName,
|
||
|
Channel: watchServiceResponse.EventChannel,
|
||
|
service: watchServiceResponse.GetAllInstancesResp,
|
||
|
ServiceInstances: merge(watchServiceResponse.GetAllInstancesResp.GetInstances()),
|
||
|
}
|
||
|
w.Ctx, w.Cancel = context.WithCancel(ctx)
|
||
|
return w, nil
|
||
|
}
|
||
|
|
||
|
// Next returns services in the following two cases:
|
||
|
// 1.the first time to watch and the service instance list is not empty.
|
||
|
// 2.any service instance changes found.
|
||
|
// if the above two conditions are not met, it will block until context deadline exceeded or canceled
|
||
|
func (w *Watcher) Next() ([]*registry.ServiceInstance, error) {
|
||
|
if !w.first {
|
||
|
w.first = true
|
||
|
if len(w.ServiceInstances) > 0 {
|
||
|
return instancesToServiceInstances(w.ServiceInstances), nil
|
||
|
}
|
||
|
}
|
||
|
select {
|
||
|
case <-w.Ctx.Done():
|
||
|
return nil, w.Ctx.Err()
|
||
|
case event := <-w.Channel:
|
||
|
if event.GetSubScribeEventType() == model.EventInstance {
|
||
|
// this always true, but we need to check it to make sure EventType not change
|
||
|
if instanceEvent, ok := event.(*model.InstanceEvent); ok {
|
||
|
// handle DeleteEvent
|
||
|
if instanceEvent.DeleteEvent != nil {
|
||
|
for _, instance := range instanceEvent.DeleteEvent.Instances {
|
||
|
delete(w.ServiceInstances, instance.GetMetadata()["merge"])
|
||
|
}
|
||
|
}
|
||
|
// handle UpdateEvent
|
||
|
if instanceEvent.UpdateEvent != nil {
|
||
|
for _, update := range instanceEvent.UpdateEvent.UpdateList {
|
||
|
if v, ok := w.ServiceInstances[update.After.GetMetadata()["merge"]]; ok {
|
||
|
var nv []model.Instance
|
||
|
m := map[string]model.Instance{}
|
||
|
for _, ins := range v {
|
||
|
m[ins.GetId()] = ins
|
||
|
}
|
||
|
m[update.After.GetId()] = update.After
|
||
|
for _, ins := range m {
|
||
|
if ins.IsHealthy() {
|
||
|
nv = append(nv, ins)
|
||
|
}
|
||
|
}
|
||
|
w.ServiceInstances[update.After.GetMetadata()["merge"]] = nv
|
||
|
if len(nv) == 0 {
|
||
|
delete(w.ServiceInstances, update.After.GetMetadata()["merge"])
|
||
|
}
|
||
|
} else {
|
||
|
if update.After.IsHealthy() {
|
||
|
w.ServiceInstances[update.After.GetMetadata()["merge"]] = []model.Instance{update.After}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
// handle AddEvent
|
||
|
if instanceEvent.AddEvent != nil {
|
||
|
for _, instance := range instanceEvent.AddEvent.Instances {
|
||
|
if v, ok := w.ServiceInstances[instance.GetMetadata()["merge"]]; ok {
|
||
|
var nv []model.Instance
|
||
|
m := map[string]model.Instance{}
|
||
|
for _, ins := range v {
|
||
|
m[ins.GetId()] = ins
|
||
|
}
|
||
|
m[instance.GetId()] = instance
|
||
|
for _, ins := range m {
|
||
|
if ins.IsHealthy() {
|
||
|
nv = append(nv, ins)
|
||
|
}
|
||
|
}
|
||
|
if len(nv) != 0 {
|
||
|
w.ServiceInstances[instance.GetMetadata()["merge"]] = nv
|
||
|
}
|
||
|
} else {
|
||
|
if instance.IsHealthy() {
|
||
|
w.ServiceInstances[instance.GetMetadata()["merge"]] = []model.Instance{instance}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
return instancesToServiceInstances(w.ServiceInstances), nil
|
||
|
}
|
||
|
}
|
||
|
return instancesToServiceInstances(w.ServiceInstances), nil
|
||
|
}
|
||
|
|
||
|
// Stop close the watcher.
|
||
|
func (w *Watcher) Stop() error {
|
||
|
w.Cancel()
|
||
|
return nil
|
||
|
}
|
||
|
|
||
|
func instancesToServiceInstances(instances map[string][]model.Instance) []*registry.ServiceInstance {
|
||
|
serviceInstances := make([]*registry.ServiceInstance, 0, len(instances))
|
||
|
for _, inss := range instances {
|
||
|
if len(inss) == 0 {
|
||
|
continue
|
||
|
}
|
||
|
ins := ®istry.ServiceInstance{
|
||
|
ID: inss[0].GetId(),
|
||
|
Name: inss[0].GetService(),
|
||
|
Version: inss[0].GetVersion(),
|
||
|
Metadata: inss[0].GetMetadata(),
|
||
|
}
|
||
|
for _, item := range inss {
|
||
|
if item.IsHealthy() {
|
||
|
ins.Endpoints = append(ins.Endpoints, fmt.Sprintf("%s://%s:%d", item.GetProtocol(), item.GetHost(), item.GetPort()))
|
||
|
}
|
||
|
}
|
||
|
if len(ins.Endpoints) != 0 {
|
||
|
serviceInstances = append(serviceInstances, ins)
|
||
|
}
|
||
|
}
|
||
|
return serviceInstances
|
||
|
}
|