You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
114 lines
2.6 KiB
114 lines
2.6 KiB
package discovery
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"time"
|
|
|
|
"github.com/go-kratos/kratos/v2/internal/endpoint"
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"github.com/go-kratos/kratos/v2/registry"
|
|
|
|
"github.com/go-kratos/aegis/subset"
|
|
"google.golang.org/grpc/attributes"
|
|
"google.golang.org/grpc/resolver"
|
|
)
|
|
|
|
type discoveryResolver struct {
|
|
w registry.Watcher
|
|
cc resolver.ClientConn
|
|
|
|
ctx context.Context
|
|
cancel context.CancelFunc
|
|
|
|
insecure bool
|
|
debugLog bool
|
|
selecterKey string
|
|
subsetSize int
|
|
}
|
|
|
|
func (r *discoveryResolver) watch() {
|
|
for {
|
|
select {
|
|
case <-r.ctx.Done():
|
|
return
|
|
default:
|
|
}
|
|
ins, err := r.w.Next()
|
|
if err != nil {
|
|
if errors.Is(err, context.Canceled) {
|
|
return
|
|
}
|
|
log.Errorf("[resolver] Failed to watch discovery endpoint: %v", err)
|
|
time.Sleep(time.Second)
|
|
continue
|
|
}
|
|
r.update(ins)
|
|
}
|
|
}
|
|
|
|
func (r *discoveryResolver) update(ins []*registry.ServiceInstance) {
|
|
addrs := make([]resolver.Address, 0)
|
|
endpoints := make(map[string]struct{})
|
|
filtered := make([]*registry.ServiceInstance, 0, len(ins))
|
|
for _, in := range ins {
|
|
ept, err := endpoint.ParseEndpoint(in.Endpoints, endpoint.Scheme("grpc", !r.insecure))
|
|
if err != nil {
|
|
log.Errorf("[resolver] Failed to parse discovery endpoint: %v", err)
|
|
continue
|
|
}
|
|
if ept == "" {
|
|
continue
|
|
}
|
|
// filter redundant endpoints
|
|
if _, ok := endpoints[ept]; ok {
|
|
continue
|
|
}
|
|
filtered = append(filtered, in)
|
|
}
|
|
if r.subsetSize != 0 {
|
|
filtered = subset.Subset(r.selecterKey, filtered, r.subsetSize)
|
|
}
|
|
for _, in := range filtered {
|
|
ept, _ := endpoint.ParseEndpoint(in.Endpoints, endpoint.Scheme("grpc", !r.insecure))
|
|
endpoints[ept] = struct{}{}
|
|
addr := resolver.Address{
|
|
ServerName: in.Name,
|
|
Attributes: parseAttributes(in.Metadata),
|
|
Addr: ept,
|
|
}
|
|
addr.Attributes = addr.Attributes.WithValue("rawServiceInstance", in)
|
|
addrs = append(addrs, addr)
|
|
}
|
|
if len(addrs) == 0 {
|
|
log.Warnf("[resolver] Zero endpoint found,refused to write, instances: %v", ins)
|
|
return
|
|
}
|
|
err := r.cc.UpdateState(resolver.State{Addresses: addrs})
|
|
if err != nil {
|
|
log.Errorf("[resolver] failed to update state: %s", err)
|
|
}
|
|
|
|
if r.debugLog {
|
|
b, _ := json.Marshal(filtered)
|
|
log.Infof("[resolver] update instances: %s", b)
|
|
}
|
|
}
|
|
|
|
func (r *discoveryResolver) Close() {
|
|
r.cancel()
|
|
err := r.w.Stop()
|
|
if err != nil {
|
|
log.Errorf("[resolver] failed to watch top: %s", err)
|
|
}
|
|
}
|
|
|
|
func (r *discoveryResolver) ResolveNow(_ resolver.ResolveNowOptions) {}
|
|
|
|
func parseAttributes(md map[string]string) (a *attributes.Attributes) {
|
|
for k, v := range md {
|
|
a = a.WithValue(k, v)
|
|
}
|
|
return a
|
|
}
|
|
|