multi zone scheduler sdk

pull/393/head
lintanghui 5 years ago
parent 693e7d3aea
commit 238bb11008
  1. 25
      pkg/naming/discovery/discovery.go
  2. 4
      pkg/naming/etcd/etcd.go
  3. 79
      pkg/naming/naming.go
  4. 178
      pkg/naming/opt.go
  5. 299
      pkg/naming/opt_test.go
  6. 2
      pkg/net/rpc/warden/resolver/direct/direct.go
  7. 71
      pkg/net/rpc/warden/resolver/resolver.go
  8. 125
      pkg/net/rpc/warden/resolver/resolver_test.go
  9. 2
      pkg/net/rpc/warden/resolver/test/mockdiscovery.go

@ -38,6 +38,7 @@ const (
var ( var (
_ naming.Builder = &Discovery{} _ naming.Builder = &Discovery{}
_ naming.Registry = &Discovery{} _ naming.Registry = &Discovery{}
_ naming.Resolver = &Resolve{}
// ErrDuplication duplication treeid. // ErrDuplication duplication treeid.
ErrDuplication = errors.New("discovery: instance duplicate registration") ErrDuplication = errors.New("discovery: instance duplicate registration")
@ -70,11 +71,6 @@ type Config struct {
Host string Host string
} }
type appData struct {
Instances map[string][]*naming.Instance `json:"instances"`
LastTs int64 `json:"latest_timestamp"`
}
// Discovery is discovery client. // Discovery is discovery client.
type Discovery struct { type Discovery struct {
c *Config c *Config
@ -219,11 +215,15 @@ func (d *Discovery) newSelf(zones map[string][]*naming.Instance) {
} }
// Build disovery resovler builder. // Build disovery resovler builder.
func (d *Discovery) Build(appid string) naming.Resolver { func (d *Discovery) Build(appid string, opts ...naming.BuildOpt) naming.Resolver {
r := &Resolve{ r := &Resolve{
id: appid, id: appid,
d: d, d: d,
event: make(chan struct{}, 1), event: make(chan struct{}, 1),
opt: new(naming.BuildOptions),
}
for _, opt := range opts {
opt.Apply(r.opt)
} }
d.mutex.Lock() d.mutex.Lock()
app, ok := d.apps[appid] app, ok := d.apps[appid]
@ -262,6 +262,7 @@ type Resolve struct {
id string id string
event chan struct{} event chan struct{}
d *Discovery d *Discovery
opt *naming.BuildOptions
} }
// Watch watch instance. // Watch watch instance.
@ -276,7 +277,17 @@ func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool
r.d.mutex.RUnlock() r.d.mutex.RUnlock()
if ok { if ok {
ins, ok = app.zoneIns.Load().(*naming.InstancesInfo) ins, ok = app.zoneIns.Load().(*naming.InstancesInfo)
return if r.opt.Filter != nil {
ins.Instances = r.opt.Filter(ins.Instances)
}
if r.opt.Scheduler != nil {
ins.Instances[r.opt.ClientZone] = r.opt.Scheduler(ins)
}
if r.opt.Subset != nil && r.opt.SubsetSize != 0 {
for zone, inss := range ins.Instances {
ins.Instances[zone] = r.opt.Subset(inss, r.opt.SubsetSize)
}
}
} }
return return
} }

@ -89,6 +89,7 @@ type Resolve struct {
id string id string
event chan struct{} event chan struct{}
e *EtcdBuilder e *EtcdBuilder
opt *naming.BuildOptions
} }
// New is new a etcdbuilder // New is new a etcdbuilder
@ -119,11 +120,12 @@ func New(c *clientv3.Config) (e *EtcdBuilder, err error) {
} }
// Build disovery resovler builder. // Build disovery resovler builder.
func (e *EtcdBuilder) Build(appid string) naming.Resolver { func (e *EtcdBuilder) Build(appid string, opts ...naming.BuildOpt) naming.Resolver {
r := &Resolve{ r := &Resolve{
id: appid, id: appid,
e: e, e: e,
event: make(chan struct{}, 1), event: make(chan struct{}, 1),
opt: new(naming.BuildOptions),
} }
e.mutex.Lock() e.mutex.Lock()
app, ok := e.apps[appid] app, ok := e.apps[appid]

@ -2,7 +2,6 @@ package naming
import ( import (
"context" "context"
"strconv"
) )
// metadata common key // metadata common key
@ -54,7 +53,7 @@ type Registry interface {
// Builder resolver builder. // Builder resolver builder.
type Builder interface { type Builder interface {
Build(id string) Resolver Build(id string, options ...BuildOpt) Resolver
Scheme() string Scheme() string
} }
@ -62,72 +61,20 @@ type Builder interface {
type InstancesInfo struct { type InstancesInfo struct {
Instances map[string][]*Instance `json:"instances"` Instances map[string][]*Instance `json:"instances"`
LastTs int64 `json:"latest_timestamp"` LastTs int64 `json:"latest_timestamp"`
Scheduler []Zone `json:"scheduler"` Scheduler *Scheduler `json:"scheduler"`
} }
// Zone zone scheduler info. // Scheduler scheduler.
type Zone struct { type Scheduler struct {
Src string `json:"src"` Clients map[string]*ZoneStrategy `json:"clients"`
Dst map[string]int64 `json:"dst"`
} }
// UseScheduler use scheduler info on instances. // ZoneStrategy is the scheduling strategy of all zones
// if instancesInfo contains scheduler info about zone, type ZoneStrategy struct {
// return releated zone's instances weighted by scheduler. Zones map[string]*Strategy `json:"zones"`
// if not,only zone instances be returned. }
func (insInf *InstancesInfo) UseScheduler(zone string) (inss []*Instance) {
var scheduler struct { // Strategy is zone scheduling strategy.
zone []string type Strategy struct {
weights []int64 Weight int64 `json:"weight"`
}
var oriWeights []int64
for _, sch := range insInf.Scheduler {
if sch.Src == zone {
for zone, schWeight := range sch.Dst {
if zins, ok := insInf.Instances[zone]; ok {
var totalWeight int64
for _, ins := range zins {
var weight int64
if weight, _ = strconv.ParseInt(ins.Metadata[MetaWeight], 10, 64); weight <= 0 {
weight = 10
}
totalWeight += weight
}
oriWeights = append(oriWeights, totalWeight)
inss = append(inss, zins...)
}
scheduler.weights = append(scheduler.weights, schWeight)
scheduler.zone = append(scheduler.zone, zone)
}
}
}
if len(inss) == 0 {
var ok bool
if inss, ok = insInf.Instances[zone]; ok {
return
}
for _, v := range insInf.Instances {
inss = append(inss, v...)
}
return
}
var comMulti int64 = 1
for _, weigth := range oriWeights {
comMulti *= weigth
}
var fixWeight = make(map[string]int64, len(scheduler.weights))
for i, zone := range scheduler.zone {
fixWeight[zone] = scheduler.weights[i] * comMulti / oriWeights[i]
}
for _, ins := range inss {
var weight int64
if weight, _ = strconv.ParseInt(ins.Metadata[MetaWeight], 10, 64); weight <= 0 {
weight = 10
}
if fix, ok := fixWeight[ins.Zone]; ok {
weight = weight * fix
}
ins.Metadata[MetaWeight] = strconv.FormatInt(weight, 10)
}
return
} }

@ -0,0 +1,178 @@
package naming
import (
"encoding/json"
"fmt"
"math/rand"
"net/url"
"os"
"sort"
"github.com/bilibili/kratos/pkg/conf/env"
"github.com/bilibili/kratos/pkg/log"
"github.com/dgryski/go-farm"
)
// BuildOptions build options.
type BuildOptions struct {
Filter func(map[string][]*Instance) map[string][]*Instance
Subset func([]*Instance, int) []*Instance
SubsetSize int
ClientZone string
Scheduler func(*InstancesInfo) []*Instance
}
// BuildOpt build option interface.
type BuildOpt interface {
Apply(*BuildOptions)
}
type funcOpt struct {
f func(*BuildOptions)
}
func (f *funcOpt) Apply(opt *BuildOptions) {
f.f(opt)
}
// Filter filter option.
func Filter(schema string, clusters map[string]struct{}) BuildOpt {
return &funcOpt{f: func(opt *BuildOptions) {
opt.Filter = func(inss map[string][]*Instance) map[string][]*Instance {
newInss := make(map[string][]*Instance)
for zone := range inss {
var instances []*Instance
for _, ins := range inss[zone] {
//如果r.clusters的长度大于0说明需要进行集群选择
if len(clusters) > 0 {
if _, ok := clusters[ins.Metadata[MetaCluster]]; !ok {
continue
}
}
var addr string
for _, a := range ins.Addrs {
u, err := url.Parse(a)
if err == nil && u.Scheme == schema {
addr = u.Host
}
}
if addr == "" {
fmt.Fprintf(os.Stderr, "resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
log.Warn("resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
continue
}
instances = append(instances, ins)
}
newInss[zone] = instances
}
return newInss
}
}}
}
func defulatSubset(inss []*Instance, size int) []*Instance {
backends := inss
if len(backends) <= int(size) {
return backends
}
clientID := env.Hostname
sort.Slice(backends, func(i, j int) bool {
return backends[i].Hostname < backends[j].Hostname
})
count := len(backends) / size
// hash得到ID
id := farm.Fingerprint64([]byte(clientID))
// 获得rand轮数
round := int64(id / uint64(count))
s := rand.NewSource(round)
ra := rand.New(s)
// 根据source洗牌
ra.Shuffle(len(backends), func(i, j int) {
backends[i], backends[j] = backends[j], backends[i]
})
start := (id % uint64(count)) * uint64(size)
return backends[int(start) : int(start)+int(size)]
}
// Subset Subset option.
func Subset(defaultSize int) BuildOpt {
return &funcOpt{f: func(opt *BuildOptions) {
opt.SubsetSize = defaultSize
opt.Subset = defulatSubset
}}
}
// ScheduleNode ScheduleNode option.
func ScheduleNode(clientZone string) BuildOpt {
return &funcOpt{f: func(opt *BuildOptions) {
opt.ClientZone = clientZone
opt.Scheduler = func(app *InstancesInfo) (instances []*Instance) {
type Zone struct {
inss []*Instance
weight int64
name string
score float64
}
var zones []*Zone
if app.Scheduler != nil {
si, err := json.Marshal(app.Scheduler)
if err == nil {
log.Info("schedule info: %s", string(si))
}
if strategy, ok := app.Scheduler.Clients[clientZone]; ok {
var min *Zone
for name, zone := range strategy.Zones {
inss := app.Instances[name]
if len(inss) == 0 {
continue
}
z := &Zone{
inss: inss,
weight: zone.Weight,
name: name,
score: float64(len(inss)) / float64(zone.Weight),
}
if min == nil || z.score < min.score {
min = z
}
zones = append(zones, z)
}
if opt.SubsetSize != 0 && len(min.inss) > opt.SubsetSize {
min.score = float64(opt.SubsetSize) / float64(min.weight)
}
for _, z := range zones {
nums := int(min.score * float64(z.weight))
if nums == 0 {
nums = 1
}
if nums < len(z.inss) {
if opt.Subset != nil {
z.inss = opt.Subset(z.inss, nums)
} else {
z.inss = defulatSubset(z.inss, nums)
}
}
}
}
}
for _, zone := range zones {
for _, ins := range zone.inss {
instances = append(instances, ins)
}
}
//如果没有拿到节点,则选择直接获取
if len(instances) == 0 {
instances = app.Instances[clientZone]
if len(instances) == 0 {
for _, value := range app.Instances {
instances = append(instances, value...)
}
}
}
return
}
}}
}

@ -0,0 +1,299 @@
package naming
import (
"fmt"
"reflect"
"testing"
)
func Test_Subset(t *testing.T) {
var inss1 []*Instance
for i := 0; i < 200; i++ {
ins := &Instance{
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
Metadata: map[string]string{MetaCluster: "c1"},
}
inss1 = append(inss1, ins)
}
var opt BuildOptions
s := Subset(50)
s.Apply(&opt)
sub1 := opt.Subset(inss1, opt.SubsetSize)
if len(sub1) != 50 {
t.Fatalf("subset size should be 50")
}
sub2 := opt.Subset(inss1, opt.SubsetSize)
if !reflect.DeepEqual(sub1, sub2) {
t.Fatalf("two subsets should equal")
}
}
func Test_FilterClusters(t *testing.T) {
inss := map[string][]*Instance{
"sh001": []*Instance{&Instance{
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c1"},
}, &Instance{
Addrs: []string{"http://127.0.0.2:9000"},
Metadata: map[string]string{MetaCluster: "c1"},
}, &Instance{
Addrs: []string{"grpc://127.0.0.3:9000"},
Metadata: map[string]string{MetaCluster: "c2"},
}},
"sh002": []*Instance{&Instance{
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c3"},
}, &Instance{
Addrs: []string{"zk://127.0.0.2:9000"},
Metadata: map[string]string{MetaCluster: "c3"},
}},
}
res := map[string][]*Instance{
"sh001": []*Instance{&Instance{
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c1"},
}},
"sh002": []*Instance{&Instance{
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c3"},
}},
}
var opt BuildOptions
f := Filter("grpc", map[string]struct{}{"c1": struct{}{}, "c3": struct{}{}})
f.Apply(&opt)
filtered := opt.Filter(inss)
equal := reflect.DeepEqual(filtered, res)
if !equal {
t.Fatalf("Filter grpc should equal,filtered:%v expected:%v", filtered, res)
}
}
func Test_FilterInvalidAddr(t *testing.T) {
inss := map[string][]*Instance{
"sh001": []*Instance{&Instance{
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c1"},
}, &Instance{
Addrs: []string{"http://127.0.0.2:9000"},
Metadata: map[string]string{MetaCluster: "c1"},
}, &Instance{
Addrs: []string{"grpc://127.0.0.3:9000"},
Metadata: map[string]string{MetaCluster: "c2"},
}},
"sh002": []*Instance{&Instance{
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c3"},
}, &Instance{
Addrs: []string{"zk://127.0.0.2:9000"},
Metadata: map[string]string{MetaCluster: "c3"},
}},
}
res := map[string][]*Instance{
"sh001": []*Instance{&Instance{
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c1"},
}, &Instance{
Addrs: []string{"grpc://127.0.0.3:9000"},
Metadata: map[string]string{MetaCluster: "c2"},
}},
"sh002": []*Instance{&Instance{
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c3"},
}},
}
var opt BuildOptions
f := Filter("grpc", nil)
f.Apply(&opt)
filtered := opt.Filter(inss)
equal := reflect.DeepEqual(filtered, res)
if !equal {
t.Fatalf("Filter grpc should equal,filtered:%v expected:%v", filtered, res)
}
}
func Test_Schedule(t *testing.T) {
app := &InstancesInfo{
Instances: map[string][]*Instance{
"sh001": []*Instance{&Instance{
Zone: "sh001",
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c1"},
}, &Instance{
Zone: "sh001",
Addrs: []string{"grpc://127.0.0.2:9000"},
Metadata: map[string]string{MetaCluster: "c1"},
}, &Instance{
Zone: "sh001",
Addrs: []string{"grpc://127.0.0.3:9000"},
Metadata: map[string]string{MetaCluster: "c2"},
}},
"sh002": []*Instance{&Instance{
Zone: "sh002",
Addrs: []string{"grpc://127.0.0.1:9000"},
Metadata: map[string]string{MetaCluster: "c3"},
}, &Instance{
Zone: "sh002",
Addrs: []string{"grpc://127.0.0.2:9000"},
Metadata: map[string]string{MetaCluster: "c3"},
}},
},
Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{
Zones: map[string]*Strategy{
"sh001": &Strategy{10},
"sh002": &Strategy{20},
},
}}},
}
var opt BuildOptions
f := ScheduleNode("sh001")
f.Apply(&opt)
err := compareAddr(opt.Scheduler(app), map[string]int{"sh002": 2, "sh001": 1})
if err != nil {
t.Fatalf(err.Error())
}
}
func Test_Schedule2(t *testing.T) {
app := &InstancesInfo{
Instances: map[string][]*Instance{},
Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{
Zones: map[string]*Strategy{
"sh001": &Strategy{10},
"sh002": &Strategy{20},
},
}}},
}
for i := 0; i < 30; i++ {
ins := &Instance{
Zone: "sh001",
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
Metadata: map[string]string{MetaCluster: "c1"},
}
app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins)
}
for i := 0; i < 30; i++ {
ins := &Instance{
Zone: "sh002",
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
Metadata: map[string]string{MetaCluster: "c2"},
}
app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins)
}
var opt BuildOptions
f := ScheduleNode("sh001")
f.Apply(&opt)
err := compareAddr(opt.Scheduler(app), map[string]int{"sh002": 30, "sh001": 15})
if err != nil {
t.Fatalf(err.Error())
}
}
func Test_Schedule3(t *testing.T) {
app := &InstancesInfo{
Instances: map[string][]*Instance{},
Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{
Zones: map[string]*Strategy{
"sh001": &Strategy{1},
"sh002": &Strategy{30},
},
}}},
}
for i := 0; i < 30; i++ {
ins := &Instance{
Zone: "sh001",
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
Metadata: map[string]string{MetaCluster: "c1"},
}
app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins)
}
for i := 0; i < 30; i++ {
ins := &Instance{
Zone: "sh002",
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
Metadata: map[string]string{MetaCluster: "c2"},
}
app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins)
}
var opt BuildOptions
f := ScheduleNode("sh001")
f.Apply(&opt)
err := compareAddr(opt.Scheduler(app), map[string]int{"sh002": 30, "sh001": 1})
if err != nil {
t.Fatalf(err.Error())
}
}
func Test_Schedule4(t *testing.T) {
app := &InstancesInfo{
Instances: map[string][]*Instance{},
Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{
Zones: map[string]*Strategy{
"sh001": &Strategy{1},
"sh002": &Strategy{30},
},
}}},
}
for i := 0; i < 30; i++ {
ins := &Instance{
Zone: "sh001",
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
Metadata: map[string]string{MetaCluster: "c1"},
}
app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins)
}
var opt BuildOptions
f := ScheduleNode("sh001")
f.Apply(&opt)
err := compareAddr(opt.Scheduler(app), map[string]int{"sh001": 30, "sh002": 0})
if err != nil {
t.Fatalf(err.Error())
}
}
func Test_Schedule5(t *testing.T) {
app := &InstancesInfo{
Instances: map[string][]*Instance{},
Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{
Zones: map[string]*Strategy{
"sh002": &Strategy{30},
},
}}},
}
for i := 0; i < 30; i++ {
ins := &Instance{
Zone: "sh001",
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
Metadata: map[string]string{MetaCluster: "c1"},
}
app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins)
}
for i := 0; i < 30; i++ {
ins := &Instance{
Zone: "sh002",
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
Metadata: map[string]string{MetaCluster: "c2"},
}
app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins)
}
var opt BuildOptions
f := ScheduleNode("sh001")
f.Apply(&opt)
err := compareAddr(opt.Scheduler(app), map[string]int{"sh002": 30, "sh001": 0})
if err != nil {
t.Fatalf(err.Error())
}
}
func compareAddr(inss []*Instance, c map[string]int) (err error) {
for _, ins := range inss {
c[ins.Zone] = c[ins.Zone] - 1
}
for zone, v := range c {
if v != 0 {
err = fmt.Errorf("zone(%s) nums is %d", zone, v)
return
}
}
return
}

@ -34,7 +34,7 @@ type Direct struct {
} }
// Build direct build. // Build direct build.
func (d *Direct) Build(id string) naming.Resolver { func (d *Direct) Build(id string, opt ...naming.BuildOpt) naming.Resolver {
return &Direct{id: id} return &Direct{id: id}
} }

@ -2,11 +2,7 @@ package resolver
import ( import (
"context" "context"
"fmt"
"math/rand"
"net/url" "net/url"
"os"
"sort"
"strconv" "strconv"
"strings" "strings"
"sync" "sync"
@ -16,7 +12,6 @@ import (
"github.com/bilibili/kratos/pkg/naming" "github.com/bilibili/kratos/pkg/naming"
wmeta "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/metadata" wmeta "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/metadata"
farm "github.com/dgryski/go-farm"
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc/resolver" "google.golang.org/grpc/resolver"
) )
@ -81,12 +76,10 @@ func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts res
} }
} }
r := &Resolver{ r := &Resolver{
nr: b.Builder.Build(str[0]), nr: b.Builder.Build(str[0], naming.Filter(Scheme, clusters), naming.ScheduleNode(zone), naming.Subset(int(ss))),
cc: cc, cc: cc,
quit: make(chan struct{}, 1), quit: make(chan struct{}, 1),
clusters: clusters,
zone: zone, zone: zone,
subsetSize: ss,
} }
go r.updateproc() go r.updateproc()
return r, nil return r, nil
@ -130,72 +123,14 @@ func (r *Resolver) updateproc() {
} }
if ins, ok := r.nr.Fetch(context.Background()); ok { if ins, ok := r.nr.Fetch(context.Background()); ok {
instances, _ := ins.Instances[r.zone] instances, _ := ins.Instances[r.zone]
res := r.filter(instances) if len(instances) == 0 {
if len(res) == 0 {
for _, value := range ins.Instances { for _, value := range ins.Instances {
instances = append(instances, value...) instances = append(instances, value...)
} }
res = r.filter(instances)
}
r.newAddress(res)
}
} }
} r.newAddress(instances)
func (r *Resolver) filter(backends []*naming.Instance) (instances []*naming.Instance) {
if len(backends) == 0 {
return
}
for _, ins := range backends {
//如果r.clusters的长度大于0说明需要进行集群选择
if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok && len(r.clusters) > 0 {
continue
}
var addr string
for _, a := range ins.Addrs {
u, err := url.Parse(a)
if err == nil && u.Scheme == Scheme {
addr = u.Host
} }
} }
if addr == "" {
fmt.Fprintf(os.Stderr, "resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
log.Warn("resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
continue
}
instances = append(instances, ins)
}
if len(instances) == 0 {
for _, bkend := range backends {
log.Warn("resolver: backends(%d) invalid instance:%v", len(backends), bkend)
}
return
}
if r.subsetSize > 0 {
instances = r.subset(instances, env.Hostname, r.subsetSize)
}
return
}
func (r *Resolver) subset(backends []*naming.Instance, clientID string, size int64) []*naming.Instance {
if len(backends) <= int(size) {
return backends
}
sort.Slice(backends, func(i, j int) bool {
return backends[i].Hostname < backends[j].Hostname
})
count := int64(len(backends)) / size
id := farm.Fingerprint64([]byte(clientID))
round := int64(id / uint64(count))
s := rand.NewSource(round)
ra := rand.New(s)
ra.Shuffle(len(backends), func(i, j int) {
backends[i], backends[j] = backends[j], backends[i]
})
start := (id % uint64(count)) * uint64(size)
return backends[int(start) : int(start)+int(size)]
} }
func (r *Resolver) newAddress(instances []*naming.Instance) { func (r *Resolver) newAddress(instances []*naming.Instance) {

@ -1,125 +0,0 @@
package resolver
import (
"fmt"
"testing"
"time"
"github.com/bilibili/kratos/pkg/naming"
)
func Test_FilterLittle(t *testing.T) {
var backs []*naming.Instance
for i := 0; i < 3; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
r := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
}
if len(r.filter(backs)) != 3 {
t.Fatalf("backends length must be 3")
}
}
func Test_FilterBig(t *testing.T) {
var backs []*naming.Instance
for i := 0; i < 100; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
r := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
}
if len(r.filter(backs)) != 50 {
t.Fatalf("backends length must be 50")
}
}
func Test_FilterNone(t *testing.T) {
var backs []*naming.Instance
for i := 0; i < 100; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Metadata: map[string]string{naming.MetaCluster: "c1"},
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
r := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
clusters: map[string]struct{}{"c2": struct{}{}},
}
if len(r.filter(backs)) != 0 {
t.Fatalf("backends length must be 0")
}
}
func Test_FilterSome(t *testing.T) {
var backs []*naming.Instance
for i := 0; i < 40; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Metadata: map[string]string{naming.MetaCluster: "c1"},
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
for i := 50; i < 150; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Metadata: map[string]string{naming.MetaCluster: "c2"},
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
r := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
clusters: map[string]struct{}{"c2": struct{}{}},
}
if len(r.filter(backs)) != 50 {
t.Fatalf("backends length must be 0")
}
r2 := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
clusters: map[string]struct{}{"c1": struct{}{}},
}
if len(r2.filter(backs)) != 40 {
t.Fatalf("backends length must be 0")
}
}

@ -12,7 +12,7 @@ type mockDiscoveryBuilder struct {
watchch map[string][]*mockDiscoveryResolver watchch map[string][]*mockDiscoveryResolver
} }
func (mb *mockDiscoveryBuilder) Build(id string) naming.Resolver { func (mb *mockDiscoveryBuilder) Build(id string, opts ...naming.BuildOpt) naming.Resolver {
mr := &mockDiscoveryResolver{ mr := &mockDiscoveryResolver{
d: mb, d: mb,
watchch: make(chan struct{}, 1), watchch: make(chan struct{}, 1),

Loading…
Cancel
Save