// The package registry simply implements the Kubernetes-based Registry package kuberegistry import ( "context" "errors" "fmt" "io/ioutil" "net/url" "os" "strconv" "strings" "time" "github.com/go-kratos/kratos/v2/registry" jsoniter "github.com/json-iterator/go" corev1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" "k8s.io/client-go/kubernetes" listerv1 "k8s.io/client-go/listers/core/v1" "k8s.io/client-go/tools/cache" ) // Defines the key name of specific fields // Kratos needs to cooperate with the following fields to run properly on Kubernetes: // kratos-service-id: define the ID of the service // kratos-service-app: define the name of the service // kratos-service-version: define the version of the service // kratos-service-metadata: define the metadata of the service // kratos-service-protocols: define the protocols of the service // // Example Deployment: // // apiVersion: apps/v1 // kind: Deployment // metadata: // name: nginx // labels: // app: nginx // spec: // replicas: 5 // selector: // matchLabels: // app: nginx // template: // metadata: // labels: // app: nginx // kratos-service-id: "56991810-c77f-4a95-8190-393efa9c1a61" // kratos-service-app: "nginx" // kratos-service-version: "v3.5.0" // annotations: // kratos-service-protocols: | // {"80": "http"} // kratos-service-metadata: | // {"region": "sh", "zone": "sh001", "cluster": "pd"} // spec: // containers: // - name: nginx // image: nginx:1.7.9 // ports: // - containerPort: 80 // // const ( // LabelsKeyServiceID is used to define the ID of the service LabelsKeyServiceID = "kratos-service-id" // LabelsKeyServiceName is used to define the name of the service LabelsKeyServiceName = "kratos-service-app" // LabelsKeyServiceVersion is used to define the version of the service LabelsKeyServiceVersion = "kratos-service-version" // AnnotationsKeyMetadata is used to define the metadata of the service AnnotationsKeyMetadata = "kratos-service-metadata" // AnnotationsKeyProtocolMap is used to define the protocols of the service // Through the value of this field, Kratos can obtain the application layer protocol corresponding to the port // Example value: {"80": "http", "8081": "grpc"} AnnotationsKeyProtocolMap = "kratos-service-protocols" ) // The registry simply implements service discovery based on Kubernetes // It has not been verified in the production environment and is currently for reference only type Registry struct { clientSet *kubernetes.Clientset informerFactory informers.SharedInformerFactory podInformer cache.SharedIndexInformer podLister listerv1.PodLister stopCh chan struct{} } // NewRegistry is used to initialize the Registry func NewRegistry(clientSet *kubernetes.Clientset) *Registry { informerFactory := informers.NewSharedInformerFactory(clientSet, time.Minute*10) podInformer := informerFactory.Core().V1().Pods().Informer() podLister := informerFactory.Core().V1().Pods().Lister() return &Registry{ clientSet: clientSet, informerFactory: informerFactory, podInformer: podInformer, podLister: podLister, stopCh: make(chan struct{}), } } // Register is used to register services // Note that on Kubernetes, it can only be used to update the id/name/version/metadata/protocols of the current service, // but it cannot be used to update node. func (s *Registry) Register(service *registry.ServiceInstance) error { // GetMetadata metadataVal, err := marshal(service.Metadata) if err != nil { return err } // Generate ProtocolMap protocolMap, err := getProtocolMapByEndpoints(service.Endpoints) if err != nil { return err } protocolMapVal, err := marshal(protocolMap) if err != nil { return err } patchBytes, err := jsoniter.Marshal(map[string]interface{}{ "metadata": metav1.ObjectMeta{ Labels: map[string]string{ LabelsKeyServiceID: service.ID, LabelsKeyServiceName: service.Name, LabelsKeyServiceVersion: service.Version, }, Annotations: map[string]string{ AnnotationsKeyMetadata: metadataVal, AnnotationsKeyProtocolMap: protocolMapVal, }, }, }) if err != nil { return err } if _, err = s.clientSet. CoreV1(). Pods(GetNamespace()). Patch(context.TODO(), GetPodName(), types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}); err != nil { return err } return nil } // Deregister the registration. func (s *Registry) Deregister(service *registry.ServiceInstance) error { return s.Register(®istry.ServiceInstance{ Metadata: map[string]string{}, }) } // Service return the service instances in memory according to the service name. func (s *Registry) Service(name string) ([]*registry.ServiceInstance, error) { pods, err := s.podLister.List(labels.SelectorFromSet(map[string]string{ LabelsKeyServiceName: name, })) if err != nil { return nil, err } var ret []*registry.ServiceInstance for _, pod := range pods { if pod.Status.Phase != corev1.PodRunning { continue } instance, err := getServiceInstanceFromPod(pod) if err != nil { return nil, err } ret = append(ret, instance) } return ret, nil } func (s *Registry) sendLatestInstances(name string, announcement chan []*registry.ServiceInstance) { instances, err := s.Service(name) if err != nil { panic(err) } announcement <- instances } // Watch creates a watcher according to the service name. func (s *Registry) Watch(name string) (registry.Watcher, error) { stopCh := make(chan struct{}, 1) announcement := make(chan []*registry.ServiceInstance, 1) s.podInformer.AddEventHandler(cache.FilteringResourceEventHandler{ FilterFunc: func(obj interface{}) bool { select { case <-stopCh: return false case <-s.stopCh: return false default: pod := obj.(*corev1.Pod) val := pod.GetLabels()[LabelsKeyServiceName] return val == name } }, Handler: cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { s.sendLatestInstances(name, announcement) }, UpdateFunc: func(oldObj, newObj interface{}) { s.sendLatestInstances(name, announcement) }, DeleteFunc: func(obj interface{}) { s.sendLatestInstances(name, announcement) }, }, }) return NewIterator(announcement, stopCh), nil } // Start is used to start the Registry // It is non-blocking func (s *Registry) Start() { s.informerFactory.Start(s.stopCh) if !cache.WaitForCacheSync(s.stopCh, s.podInformer.HasSynced) { return } } // Close is used to close the Registry // After closing, any callbacks generated by Watch will not be executed func (s *Registry) Close() { select { case <-s.stopCh: default: close(s.stopCh) } } // //////////// K8S Runtime //////////// // ServiceAccountNamespacePath defines the location of the namespace file const ServiceAccountNamespacePath = "/var/run/secrets/kubernetes.io/serviceaccount/namespace" var currentNamespace = LoadNamespace() // LoadNamespace is used to get the current namespace from the file func LoadNamespace() string { data, err := ioutil.ReadFile(ServiceAccountNamespacePath) if err != nil { return "" } return string(data) } // GetNamespace is used to get the namespace of the Pod where the current container is located func GetNamespace() string { return currentNamespace } // GetNamespace is used to get the name of the Pod where the current container is located func GetPodName() string { return os.Getenv("HOSTNAME") } // //////////// ProtocolMap //////////// type protocolMap map[string]string func (m protocolMap) GetProtocol(port int32) string { return m[strconv.Itoa(int(port))] } // //////////// Iterator //////////// // Iterator performs the conversion from channel to iterator // It reads the latest changes from the `chan []*registry.ServiceInstance` // And the outside can sense the closure of Iterator through stopCh type Iterator struct { ch chan []*registry.ServiceInstance stopCh chan struct{} } // NewIterator is used to initialize Iterator func NewIterator(channel chan []*registry.ServiceInstance, stopCh chan struct{}) *Iterator { return &Iterator{ ch: channel, stopCh: stopCh, } } // Next will block until ServiceInstance changes func (iter *Iterator) Next() ([]*registry.ServiceInstance, error) { select { case instances := <-iter.ch: return instances, nil case <-iter.stopCh: return nil, ErrIteratorClosed } } // Close is used to close the iterator func (iter *Iterator) Stop() error { select { case <-iter.stopCh: default: close(iter.stopCh) } return nil } // //////////// Helper Func //////////// func marshal(in interface{}) (string, error) { return jsoniter.MarshalToString(in) } func unmarshal(data string, in interface{}) error { return jsoniter.UnmarshalFromString(data, in) } func isEmptyObjectString(s string) bool { switch s { case "", "{}", "null", "nil", "[]": return true } return false } func getProtocolMapByEndpoints(endpoints []string) (protocolMap, error) { ret := protocolMap{} for _, endpoint := range endpoints { u, err := url.Parse(endpoint) if err != nil { return nil, err } ret[u.Port()] = u.Scheme } return ret, nil } func getProtocolMapFromPod(pod *corev1.Pod) (protocolMap, error) { protocolMap := protocolMap{} if s := pod.Annotations[AnnotationsKeyProtocolMap]; !isEmptyObjectString(s) { err := unmarshal(s, &protocolMap) if err != nil { return nil, &ErrorHandleResource{Namespace: pod.Namespace, Name: pod.Name, Reason: err} } } return protocolMap, nil } func getMetadataFromPod(pod *corev1.Pod) (map[string]string, error) { metadata := map[string]string{} if s := pod.Annotations[AnnotationsKeyMetadata]; !isEmptyObjectString(s) { err := unmarshal(s, &metadata) if err != nil { return nil, &ErrorHandleResource{Namespace: pod.Namespace, Name: pod.Name, Reason: err} } } return metadata, nil } func getServiceInstanceFromPod(pod *corev1.Pod) (*registry.ServiceInstance, error) { podIP := pod.Status.PodIP podLabels := pod.GetLabels() // Get Metadata metadata, err := getMetadataFromPod(pod) if err != nil { return nil, err } // Get Protocols Definition protocolMap, err := getProtocolMapFromPod(pod) if err != nil { return nil, err } // Get Endpoints var endpoints []string for _, container := range pod.Spec.Containers { for _, cp := range container.Ports { port := cp.ContainerPort protocol := protocolMap.GetProtocol(port) if protocol == "" { if cp.Name != "" { protocol = strings.Split(cp.Name, "-")[0] } else { protocol = string(cp.Protocol) } } addr := fmt.Sprintf("%s://%s:%d", protocol, podIP, port) endpoints = append(endpoints, addr) } } return ®istry.ServiceInstance{ ID: podLabels[LabelsKeyServiceID], Name: podLabels[LabelsKeyServiceName], Version: podLabels[LabelsKeyServiceVersion], Metadata: metadata, Endpoints: endpoints, }, nil } // //////////// Error Definition //////////// // ErrIteratorClosed defines the error that the iterator is closed var ErrIteratorClosed = errors.New("iterator closed") // ErrorHandleResource defines the error that cannot handle K8S resources normally type ErrorHandleResource struct { Namespace string Name string Reason error } // Error implements the error interface func (err *ErrorHandleResource) Error() string { return fmt.Sprintf("failed to handle resource(namespace=%s, name=%s): %s", err.Namespace, err.Name, err.Reason) }