fix: k8s discovery interface (#1820)

pull/1822/head
Jason C.H 3 years ago committed by GitHub
parent 9662ef3c21
commit 471a2aab79
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 18
      contrib/registry/kubernetes/registry.go

@ -11,6 +11,7 @@ import (
"strings"
"time"
"github.com/go-kratos/kratos/v2/registry"
jsoniter "github.com/json-iterator/go"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -20,8 +21,6 @@ import (
"k8s.io/client-go/kubernetes"
listerv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"github.com/go-kratos/kratos/v2/registry"
)
// Defines the key name of specific fields
@ -64,7 +63,6 @@ import (
// ports:
// - containerPort: 80
//
//
const (
// LabelsKeyServiceID is used to define the ID of the service
LabelsKeyServiceID = "kratos-service-id"
@ -159,7 +157,7 @@ func (s *Registry) Deregister(ctx context.Context, service *registry.ServiceInst
}
// Service return the service instances in memory according to the service name.
func (s *Registry) Service(name string) ([]*registry.ServiceInstance, error) {
func (s *Registry) GetService(ctx context.Context, name string) ([]*registry.ServiceInstance, error) {
pods, err := s.podLister.List(labels.SelectorFromSet(map[string]string{
LabelsKeyServiceName: name,
}))
@ -180,8 +178,8 @@ func (s *Registry) Service(name string) ([]*registry.ServiceInstance, error) {
return ret, nil
}
func (s *Registry) sendLatestInstances(name string, announcement chan []*registry.ServiceInstance) {
instances, err := s.Service(name)
func (s *Registry) sendLatestInstances(ctx context.Context, name string, announcement chan []*registry.ServiceInstance) {
instances, err := s.GetService(ctx, name)
if err != nil {
panic(err)
}
@ -189,7 +187,7 @@ func (s *Registry) sendLatestInstances(name string, announcement chan []*registr
}
// Watch creates a watcher according to the service name.
func (s *Registry) Watch(name string) (registry.Watcher, error) {
func (s *Registry) Watch(ctx context.Context, name string) (registry.Watcher, error) {
stopCh := make(chan struct{}, 1)
announcement := make(chan []*registry.ServiceInstance, 1)
s.podInformer.AddEventHandler(cache.FilteringResourceEventHandler{
@ -207,13 +205,13 @@ func (s *Registry) Watch(name string) (registry.Watcher, error) {
},
Handler: cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
s.sendLatestInstances(name, announcement)
s.sendLatestInstances(ctx, name, announcement)
},
UpdateFunc: func(oldObj, newObj interface{}) {
s.sendLatestInstances(name, announcement)
s.sendLatestInstances(ctx, name, announcement)
},
DeleteFunc: func(obj interface{}) {
s.sendLatestInstances(name, announcement)
s.sendLatestInstances(ctx, name, announcement)
},
},
})

Loading…
Cancel
Save