kratos/contrib/registry/discovery/impl_discover.go

89 lines
1.9 KiB

package discovery
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/go-kratos/kratos/v2/registry"
)
func filterInstancesByZone(ins *disInstancesInfo, zone string) []*registry.ServiceInstance {
zoneInstance, ok := ins.Instances[zone]
if !ok || len(zoneInstance) == 0 {
return nil
}
out := make([]*registry.ServiceInstance, 0, len(zoneInstance))
for _, v := range zoneInstance {
if v == nil {
continue
}
out = append(out, toServiceInstance(v))
}
return out
}
func (d *Discovery) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
r := d.resolveBuild(serviceName)
ins, ok := r.fetch(ctx)
if !ok {
return nil, errors.New("Discovery.GetService fetch failed")
}
out := filterInstancesByZone(ins, d.config.Zone)
if len(out) == 0 {
return nil, fmt.Errorf("Discovery.GetService(%s) not found", serviceName)
}
return out, nil
}
func (d *Discovery) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
return &watcher{
Resolve: d.resolveBuild(serviceName),
serviceName: serviceName,
cancelCtx: ctx,
}, nil
}
type watcher struct {
*Resolve
cancelCtx context.Context
serviceName string
}
func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
event := w.Resolve.Watch()
select {
case <-event:
// change event come
case <-w.cancelCtx.Done():
return nil, fmt.Errorf("watch context cancelled: %v", w.cancelCtx.Err())
}
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
defer cancel()
ins, ok := w.Resolve.fetch(ctx)
if !ok {
return nil, errors.New("Discovery.GetService fetch failed")
}
out := filterInstancesByZone(ins, w.Resolve.d.config.Zone)
if len(out) == 0 {
return nil, fmt.Errorf("Discovery.GetService(%s) not found", w.serviceName)
}
return out, nil
}
func (w *watcher) Stop() error {
return w.Resolve.Close()
}