From 238bb1100881d784a971e3aa8120a34f21fc6868 Mon Sep 17 00:00:00 2001 From: lintanghui Date: Wed, 23 Oct 2019 10:50:29 +0800 Subject: [PATCH] multi zone scheduler sdk --- pkg/naming/discovery/discovery.go | 25 +- pkg/naming/etcd/etcd.go | 4 +- pkg/naming/naming.go | 79 +---- pkg/naming/opt.go | 178 +++++++++++ pkg/naming/opt_test.go | 299 ++++++++++++++++++ pkg/net/rpc/warden/resolver/direct/direct.go | 2 +- pkg/net/rpc/warden/resolver/resolver.go | 77 +---- pkg/net/rpc/warden/resolver/resolver_test.go | 125 -------- .../rpc/warden/resolver/test/mockdiscovery.go | 2 +- 9 files changed, 519 insertions(+), 272 deletions(-) create mode 100644 pkg/naming/opt.go create mode 100644 pkg/naming/opt_test.go delete mode 100644 pkg/net/rpc/warden/resolver/resolver_test.go diff --git a/pkg/naming/discovery/discovery.go b/pkg/naming/discovery/discovery.go index 9876315a1..088304d25 100644 --- a/pkg/naming/discovery/discovery.go +++ b/pkg/naming/discovery/discovery.go @@ -38,6 +38,7 @@ const ( var ( _ naming.Builder = &Discovery{} _ naming.Registry = &Discovery{} + _ naming.Resolver = &Resolve{} // ErrDuplication duplication treeid. ErrDuplication = errors.New("discovery: instance duplicate registration") @@ -70,11 +71,6 @@ type Config struct { Host string } -type appData struct { - Instances map[string][]*naming.Instance `json:"instances"` - LastTs int64 `json:"latest_timestamp"` -} - // Discovery is discovery client. type Discovery struct { c *Config @@ -219,11 +215,15 @@ func (d *Discovery) newSelf(zones map[string][]*naming.Instance) { } // Build disovery resovler builder. -func (d *Discovery) Build(appid string) naming.Resolver { +func (d *Discovery) Build(appid string, opts ...naming.BuildOpt) naming.Resolver { r := &Resolve{ id: appid, d: d, event: make(chan struct{}, 1), + opt: new(naming.BuildOptions), + } + for _, opt := range opts { + opt.Apply(r.opt) } d.mutex.Lock() app, ok := d.apps[appid] @@ -262,6 +262,7 @@ type Resolve struct { id string event chan struct{} d *Discovery + opt *naming.BuildOptions } // Watch watch instance. @@ -276,7 +277,17 @@ func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool r.d.mutex.RUnlock() if ok { ins, ok = app.zoneIns.Load().(*naming.InstancesInfo) - return + if r.opt.Filter != nil { + ins.Instances = r.opt.Filter(ins.Instances) + } + if r.opt.Scheduler != nil { + ins.Instances[r.opt.ClientZone] = r.opt.Scheduler(ins) + } + if r.opt.Subset != nil && r.opt.SubsetSize != 0 { + for zone, inss := range ins.Instances { + ins.Instances[zone] = r.opt.Subset(inss, r.opt.SubsetSize) + } + } } return } diff --git a/pkg/naming/etcd/etcd.go b/pkg/naming/etcd/etcd.go index 241b192f3..a946a0d6d 100644 --- a/pkg/naming/etcd/etcd.go +++ b/pkg/naming/etcd/etcd.go @@ -89,6 +89,7 @@ type Resolve struct { id string event chan struct{} e *EtcdBuilder + opt *naming.BuildOptions } // New is new a etcdbuilder @@ -119,11 +120,12 @@ func New(c *clientv3.Config) (e *EtcdBuilder, err error) { } // Build disovery resovler builder. -func (e *EtcdBuilder) Build(appid string) naming.Resolver { +func (e *EtcdBuilder) Build(appid string, opts ...naming.BuildOpt) naming.Resolver { r := &Resolve{ id: appid, e: e, event: make(chan struct{}, 1), + opt: new(naming.BuildOptions), } e.mutex.Lock() app, ok := e.apps[appid] diff --git a/pkg/naming/naming.go b/pkg/naming/naming.go index 5e0d76348..936faf626 100644 --- a/pkg/naming/naming.go +++ b/pkg/naming/naming.go @@ -2,7 +2,6 @@ package naming import ( "context" - "strconv" ) // metadata common key @@ -54,7 +53,7 @@ type Registry interface { // Builder resolver builder. type Builder interface { - Build(id string) Resolver + Build(id string, options ...BuildOpt) Resolver Scheme() string } @@ -62,72 +61,20 @@ type Builder interface { type InstancesInfo struct { Instances map[string][]*Instance `json:"instances"` LastTs int64 `json:"latest_timestamp"` - Scheduler []Zone `json:"scheduler"` + Scheduler *Scheduler `json:"scheduler"` } -// Zone zone scheduler info. -type Zone struct { - Src string `json:"src"` - Dst map[string]int64 `json:"dst"` +// Scheduler scheduler. +type Scheduler struct { + Clients map[string]*ZoneStrategy `json:"clients"` } -// UseScheduler use scheduler info on instances. -// if instancesInfo contains scheduler info about zone, -// return releated zone's instances weighted by scheduler. -// if not,only zone instances be returned. -func (insInf *InstancesInfo) UseScheduler(zone string) (inss []*Instance) { - var scheduler struct { - zone []string - weights []int64 - } - var oriWeights []int64 - for _, sch := range insInf.Scheduler { - if sch.Src == zone { - for zone, schWeight := range sch.Dst { - if zins, ok := insInf.Instances[zone]; ok { - var totalWeight int64 - for _, ins := range zins { - var weight int64 - if weight, _ = strconv.ParseInt(ins.Metadata[MetaWeight], 10, 64); weight <= 0 { - weight = 10 - } - totalWeight += weight - } - oriWeights = append(oriWeights, totalWeight) - inss = append(inss, zins...) - } - scheduler.weights = append(scheduler.weights, schWeight) - scheduler.zone = append(scheduler.zone, zone) - } - } - } - if len(inss) == 0 { - var ok bool - if inss, ok = insInf.Instances[zone]; ok { - return - } - for _, v := range insInf.Instances { - inss = append(inss, v...) - } - return - } - var comMulti int64 = 1 - for _, weigth := range oriWeights { - comMulti *= weigth - } - var fixWeight = make(map[string]int64, len(scheduler.weights)) - for i, zone := range scheduler.zone { - fixWeight[zone] = scheduler.weights[i] * comMulti / oriWeights[i] - } - for _, ins := range inss { - var weight int64 - if weight, _ = strconv.ParseInt(ins.Metadata[MetaWeight], 10, 64); weight <= 0 { - weight = 10 - } - if fix, ok := fixWeight[ins.Zone]; ok { - weight = weight * fix - } - ins.Metadata[MetaWeight] = strconv.FormatInt(weight, 10) - } - return +// ZoneStrategy is the scheduling strategy of all zones +type ZoneStrategy struct { + Zones map[string]*Strategy `json:"zones"` +} + +// Strategy is zone scheduling strategy. +type Strategy struct { + Weight int64 `json:"weight"` } diff --git a/pkg/naming/opt.go b/pkg/naming/opt.go new file mode 100644 index 000000000..e49c9f01a --- /dev/null +++ b/pkg/naming/opt.go @@ -0,0 +1,178 @@ +package naming + +import ( + "encoding/json" + "fmt" + "math/rand" + "net/url" + "os" + "sort" + + "github.com/bilibili/kratos/pkg/conf/env" + "github.com/bilibili/kratos/pkg/log" + + "github.com/dgryski/go-farm" +) + +// BuildOptions build options. +type BuildOptions struct { + Filter func(map[string][]*Instance) map[string][]*Instance + Subset func([]*Instance, int) []*Instance + SubsetSize int + ClientZone string + Scheduler func(*InstancesInfo) []*Instance +} + +// BuildOpt build option interface. +type BuildOpt interface { + Apply(*BuildOptions) +} + +type funcOpt struct { + f func(*BuildOptions) +} + +func (f *funcOpt) Apply(opt *BuildOptions) { + f.f(opt) +} + +// Filter filter option. +func Filter(schema string, clusters map[string]struct{}) BuildOpt { + return &funcOpt{f: func(opt *BuildOptions) { + opt.Filter = func(inss map[string][]*Instance) map[string][]*Instance { + newInss := make(map[string][]*Instance) + for zone := range inss { + var instances []*Instance + for _, ins := range inss[zone] { + //如果r.clusters的长度大于0说明需要进行集群选择 + if len(clusters) > 0 { + if _, ok := clusters[ins.Metadata[MetaCluster]]; !ok { + continue + } + } + var addr string + for _, a := range ins.Addrs { + u, err := url.Parse(a) + if err == nil && u.Scheme == schema { + addr = u.Host + } + } + if addr == "" { + fmt.Fprintf(os.Stderr, "resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs) + log.Warn("resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs) + continue + } + instances = append(instances, ins) + } + newInss[zone] = instances + } + return newInss + } + }} +} + +func defulatSubset(inss []*Instance, size int) []*Instance { + backends := inss + if len(backends) <= int(size) { + return backends + } + clientID := env.Hostname + sort.Slice(backends, func(i, j int) bool { + return backends[i].Hostname < backends[j].Hostname + }) + count := len(backends) / size + // hash得到ID + id := farm.Fingerprint64([]byte(clientID)) + // 获得rand轮数 + round := int64(id / uint64(count)) + + s := rand.NewSource(round) + ra := rand.New(s) + // 根据source洗牌 + ra.Shuffle(len(backends), func(i, j int) { + backends[i], backends[j] = backends[j], backends[i] + }) + start := (id % uint64(count)) * uint64(size) + return backends[int(start) : int(start)+int(size)] +} + +// Subset Subset option. +func Subset(defaultSize int) BuildOpt { + return &funcOpt{f: func(opt *BuildOptions) { + opt.SubsetSize = defaultSize + opt.Subset = defulatSubset + }} +} + +// ScheduleNode ScheduleNode option. +func ScheduleNode(clientZone string) BuildOpt { + return &funcOpt{f: func(opt *BuildOptions) { + opt.ClientZone = clientZone + opt.Scheduler = func(app *InstancesInfo) (instances []*Instance) { + type Zone struct { + inss []*Instance + weight int64 + name string + score float64 + } + var zones []*Zone + + if app.Scheduler != nil { + si, err := json.Marshal(app.Scheduler) + if err == nil { + log.Info("schedule info: %s", string(si)) + } + if strategy, ok := app.Scheduler.Clients[clientZone]; ok { + var min *Zone + for name, zone := range strategy.Zones { + inss := app.Instances[name] + if len(inss) == 0 { + continue + } + z := &Zone{ + inss: inss, + weight: zone.Weight, + name: name, + score: float64(len(inss)) / float64(zone.Weight), + } + if min == nil || z.score < min.score { + min = z + } + zones = append(zones, z) + } + if opt.SubsetSize != 0 && len(min.inss) > opt.SubsetSize { + min.score = float64(opt.SubsetSize) / float64(min.weight) + } + for _, z := range zones { + nums := int(min.score * float64(z.weight)) + if nums == 0 { + nums = 1 + } + if nums < len(z.inss) { + if opt.Subset != nil { + z.inss = opt.Subset(z.inss, nums) + } else { + z.inss = defulatSubset(z.inss, nums) + } + } + } + } + } + for _, zone := range zones { + for _, ins := range zone.inss { + instances = append(instances, ins) + } + } + //如果没有拿到节点,则选择直接获取 + if len(instances) == 0 { + instances = app.Instances[clientZone] + if len(instances) == 0 { + for _, value := range app.Instances { + instances = append(instances, value...) + } + } + } + return + } + }} +} diff --git a/pkg/naming/opt_test.go b/pkg/naming/opt_test.go new file mode 100644 index 000000000..8d42cbe5f --- /dev/null +++ b/pkg/naming/opt_test.go @@ -0,0 +1,299 @@ +package naming + +import ( + "fmt" + "reflect" + "testing" +) + +func Test_Subset(t *testing.T) { + var inss1 []*Instance + for i := 0; i < 200; i++ { + ins := &Instance{ + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + Metadata: map[string]string{MetaCluster: "c1"}, + } + inss1 = append(inss1, ins) + } + var opt BuildOptions + s := Subset(50) + s.Apply(&opt) + sub1 := opt.Subset(inss1, opt.SubsetSize) + if len(sub1) != 50 { + t.Fatalf("subset size should be 50") + } + sub2 := opt.Subset(inss1, opt.SubsetSize) + if !reflect.DeepEqual(sub1, sub2) { + t.Fatalf("two subsets should equal") + } +} + +func Test_FilterClusters(t *testing.T) { + inss := map[string][]*Instance{ + "sh001": []*Instance{&Instance{ + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c1"}, + }, &Instance{ + Addrs: []string{"http://127.0.0.2:9000"}, + Metadata: map[string]string{MetaCluster: "c1"}, + }, &Instance{ + Addrs: []string{"grpc://127.0.0.3:9000"}, + Metadata: map[string]string{MetaCluster: "c2"}, + }}, + "sh002": []*Instance{&Instance{ + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c3"}, + }, &Instance{ + Addrs: []string{"zk://127.0.0.2:9000"}, + Metadata: map[string]string{MetaCluster: "c3"}, + }}, + } + res := map[string][]*Instance{ + "sh001": []*Instance{&Instance{ + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c1"}, + }}, + "sh002": []*Instance{&Instance{ + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c3"}, + }}, + } + var opt BuildOptions + f := Filter("grpc", map[string]struct{}{"c1": struct{}{}, "c3": struct{}{}}) + f.Apply(&opt) + filtered := opt.Filter(inss) + equal := reflect.DeepEqual(filtered, res) + if !equal { + t.Fatalf("Filter grpc should equal,filtered:%v expected:%v", filtered, res) + } +} + +func Test_FilterInvalidAddr(t *testing.T) { + inss := map[string][]*Instance{ + "sh001": []*Instance{&Instance{ + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c1"}, + }, &Instance{ + Addrs: []string{"http://127.0.0.2:9000"}, + Metadata: map[string]string{MetaCluster: "c1"}, + }, &Instance{ + Addrs: []string{"grpc://127.0.0.3:9000"}, + Metadata: map[string]string{MetaCluster: "c2"}, + }}, + "sh002": []*Instance{&Instance{ + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c3"}, + }, &Instance{ + Addrs: []string{"zk://127.0.0.2:9000"}, + Metadata: map[string]string{MetaCluster: "c3"}, + }}, + } + res := map[string][]*Instance{ + "sh001": []*Instance{&Instance{ + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c1"}, + }, &Instance{ + Addrs: []string{"grpc://127.0.0.3:9000"}, + Metadata: map[string]string{MetaCluster: "c2"}, + }}, + "sh002": []*Instance{&Instance{ + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c3"}, + }}, + } + var opt BuildOptions + f := Filter("grpc", nil) + f.Apply(&opt) + filtered := opt.Filter(inss) + equal := reflect.DeepEqual(filtered, res) + if !equal { + t.Fatalf("Filter grpc should equal,filtered:%v expected:%v", filtered, res) + } +} + +func Test_Schedule(t *testing.T) { + app := &InstancesInfo{ + Instances: map[string][]*Instance{ + "sh001": []*Instance{&Instance{ + Zone: "sh001", + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c1"}, + }, &Instance{ + Zone: "sh001", + Addrs: []string{"grpc://127.0.0.2:9000"}, + Metadata: map[string]string{MetaCluster: "c1"}, + }, &Instance{ + Zone: "sh001", + Addrs: []string{"grpc://127.0.0.3:9000"}, + Metadata: map[string]string{MetaCluster: "c2"}, + }}, + "sh002": []*Instance{&Instance{ + Zone: "sh002", + Addrs: []string{"grpc://127.0.0.1:9000"}, + Metadata: map[string]string{MetaCluster: "c3"}, + }, &Instance{ + Zone: "sh002", + Addrs: []string{"grpc://127.0.0.2:9000"}, + Metadata: map[string]string{MetaCluster: "c3"}, + }}, + }, + Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{ + Zones: map[string]*Strategy{ + "sh001": &Strategy{10}, + "sh002": &Strategy{20}, + }, + }}}, + } + var opt BuildOptions + f := ScheduleNode("sh001") + f.Apply(&opt) + err := compareAddr(opt.Scheduler(app), map[string]int{"sh002": 2, "sh001": 1}) + if err != nil { + t.Fatalf(err.Error()) + } +} + +func Test_Schedule2(t *testing.T) { + app := &InstancesInfo{ + Instances: map[string][]*Instance{}, + Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{ + Zones: map[string]*Strategy{ + "sh001": &Strategy{10}, + "sh002": &Strategy{20}, + }, + }}}, + } + for i := 0; i < 30; i++ { + ins := &Instance{ + Zone: "sh001", + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + Metadata: map[string]string{MetaCluster: "c1"}, + } + app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins) + } + for i := 0; i < 30; i++ { + ins := &Instance{ + Zone: "sh002", + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + Metadata: map[string]string{MetaCluster: "c2"}, + } + app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins) + } + var opt BuildOptions + f := ScheduleNode("sh001") + f.Apply(&opt) + err := compareAddr(opt.Scheduler(app), map[string]int{"sh002": 30, "sh001": 15}) + if err != nil { + t.Fatalf(err.Error()) + } +} + +func Test_Schedule3(t *testing.T) { + app := &InstancesInfo{ + Instances: map[string][]*Instance{}, + Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{ + Zones: map[string]*Strategy{ + "sh001": &Strategy{1}, + "sh002": &Strategy{30}, + }, + }}}, + } + for i := 0; i < 30; i++ { + ins := &Instance{ + Zone: "sh001", + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + Metadata: map[string]string{MetaCluster: "c1"}, + } + app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins) + } + for i := 0; i < 30; i++ { + ins := &Instance{ + Zone: "sh002", + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + Metadata: map[string]string{MetaCluster: "c2"}, + } + app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins) + } + var opt BuildOptions + f := ScheduleNode("sh001") + f.Apply(&opt) + err := compareAddr(opt.Scheduler(app), map[string]int{"sh002": 30, "sh001": 1}) + if err != nil { + t.Fatalf(err.Error()) + } +} + +func Test_Schedule4(t *testing.T) { + app := &InstancesInfo{ + Instances: map[string][]*Instance{}, + Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{ + Zones: map[string]*Strategy{ + "sh001": &Strategy{1}, + "sh002": &Strategy{30}, + }, + }}}, + } + for i := 0; i < 30; i++ { + ins := &Instance{ + Zone: "sh001", + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + Metadata: map[string]string{MetaCluster: "c1"}, + } + app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins) + } + + var opt BuildOptions + f := ScheduleNode("sh001") + f.Apply(&opt) + err := compareAddr(opt.Scheduler(app), map[string]int{"sh001": 30, "sh002": 0}) + if err != nil { + t.Fatalf(err.Error()) + } +} + +func Test_Schedule5(t *testing.T) { + app := &InstancesInfo{ + Instances: map[string][]*Instance{}, + Scheduler: &Scheduler{map[string]*ZoneStrategy{"sh001": &ZoneStrategy{ + Zones: map[string]*Strategy{ + "sh002": &Strategy{30}, + }, + }}}, + } + for i := 0; i < 30; i++ { + ins := &Instance{ + Zone: "sh001", + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + Metadata: map[string]string{MetaCluster: "c1"}, + } + app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins) + } + for i := 0; i < 30; i++ { + ins := &Instance{ + Zone: "sh002", + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + Metadata: map[string]string{MetaCluster: "c2"}, + } + app.Instances[ins.Zone] = append(app.Instances[ins.Zone], ins) + } + var opt BuildOptions + f := ScheduleNode("sh001") + f.Apply(&opt) + err := compareAddr(opt.Scheduler(app), map[string]int{"sh002": 30, "sh001": 0}) + if err != nil { + t.Fatalf(err.Error()) + } +} + +func compareAddr(inss []*Instance, c map[string]int) (err error) { + for _, ins := range inss { + c[ins.Zone] = c[ins.Zone] - 1 + } + for zone, v := range c { + if v != 0 { + err = fmt.Errorf("zone(%s) nums is %d", zone, v) + return + } + } + return +} diff --git a/pkg/net/rpc/warden/resolver/direct/direct.go b/pkg/net/rpc/warden/resolver/direct/direct.go index 2db24d295..0445b978a 100644 --- a/pkg/net/rpc/warden/resolver/direct/direct.go +++ b/pkg/net/rpc/warden/resolver/direct/direct.go @@ -34,7 +34,7 @@ type Direct struct { } // Build direct build. -func (d *Direct) Build(id string) naming.Resolver { +func (d *Direct) Build(id string, opt ...naming.BuildOpt) naming.Resolver { return &Direct{id: id} } diff --git a/pkg/net/rpc/warden/resolver/resolver.go b/pkg/net/rpc/warden/resolver/resolver.go index 4804e148e..1b5f51141 100644 --- a/pkg/net/rpc/warden/resolver/resolver.go +++ b/pkg/net/rpc/warden/resolver/resolver.go @@ -2,11 +2,7 @@ package resolver import ( "context" - "fmt" - "math/rand" "net/url" - "os" - "sort" "strconv" "strings" "sync" @@ -16,7 +12,6 @@ import ( "github.com/bilibili/kratos/pkg/naming" wmeta "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/metadata" - farm "github.com/dgryski/go-farm" "github.com/pkg/errors" "google.golang.org/grpc/resolver" ) @@ -81,12 +76,10 @@ func (b *Builder) Build(target resolver.Target, cc resolver.ClientConn, opts res } } r := &Resolver{ - nr: b.Builder.Build(str[0]), - cc: cc, - quit: make(chan struct{}, 1), - clusters: clusters, - zone: zone, - subsetSize: ss, + nr: b.Builder.Build(str[0], naming.Filter(Scheme, clusters), naming.ScheduleNode(zone), naming.Subset(int(ss))), + cc: cc, + quit: make(chan struct{}, 1), + zone: zone, } go r.updateproc() return r, nil @@ -130,74 +123,16 @@ func (r *Resolver) updateproc() { } if ins, ok := r.nr.Fetch(context.Background()); ok { instances, _ := ins.Instances[r.zone] - res := r.filter(instances) - if len(res) == 0 { + if len(instances) == 0 { for _, value := range ins.Instances { instances = append(instances, value...) } - res = r.filter(instances) } - r.newAddress(res) + r.newAddress(instances) } } } -func (r *Resolver) filter(backends []*naming.Instance) (instances []*naming.Instance) { - if len(backends) == 0 { - return - } - for _, ins := range backends { - //如果r.clusters的长度大于0说明需要进行集群选择 - if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok && len(r.clusters) > 0 { - continue - } - var addr string - for _, a := range ins.Addrs { - u, err := url.Parse(a) - if err == nil && u.Scheme == Scheme { - addr = u.Host - } - } - if addr == "" { - fmt.Fprintf(os.Stderr, "resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs) - log.Warn("resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs) - continue - } - instances = append(instances, ins) - } - if len(instances) == 0 { - for _, bkend := range backends { - log.Warn("resolver: backends(%d) invalid instance:%v", len(backends), bkend) - } - return - } - if r.subsetSize > 0 { - instances = r.subset(instances, env.Hostname, r.subsetSize) - } - return -} - -func (r *Resolver) subset(backends []*naming.Instance, clientID string, size int64) []*naming.Instance { - if len(backends) <= int(size) { - return backends - } - sort.Slice(backends, func(i, j int) bool { - return backends[i].Hostname < backends[j].Hostname - }) - count := int64(len(backends)) / size - - id := farm.Fingerprint64([]byte(clientID)) - round := int64(id / uint64(count)) - - s := rand.NewSource(round) - ra := rand.New(s) - ra.Shuffle(len(backends), func(i, j int) { - backends[i], backends[j] = backends[j], backends[i] - }) - start := (id % uint64(count)) * uint64(size) - return backends[int(start) : int(start)+int(size)] -} - func (r *Resolver) newAddress(instances []*naming.Instance) { if len(instances) <= 0 { return diff --git a/pkg/net/rpc/warden/resolver/resolver_test.go b/pkg/net/rpc/warden/resolver/resolver_test.go deleted file mode 100644 index 54b0998c1..000000000 --- a/pkg/net/rpc/warden/resolver/resolver_test.go +++ /dev/null @@ -1,125 +0,0 @@ -package resolver - -import ( - "fmt" - "testing" - "time" - - "github.com/bilibili/kratos/pkg/naming" -) - -func Test_FilterLittle(t *testing.T) { - var backs []*naming.Instance - for i := 0; i < 3; i++ { - backs = append(backs, &naming.Instance{ - Zone: "sh1", - Env: "prod", - AppID: "2233", - Hostname: fmt.Sprintf("linux-%d", i), - Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, - LastTs: time.Now().Unix(), - }) - } - r := &Resolver{ - quit: make(chan struct{}, 1), - zone: "sh1", - subsetSize: 50, - } - - if len(r.filter(backs)) != 3 { - t.Fatalf("backends length must be 3") - } -} - -func Test_FilterBig(t *testing.T) { - var backs []*naming.Instance - for i := 0; i < 100; i++ { - backs = append(backs, &naming.Instance{ - Zone: "sh1", - Env: "prod", - AppID: "2233", - Hostname: fmt.Sprintf("linux-%d", i), - Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, - LastTs: time.Now().Unix(), - }) - } - r := &Resolver{ - quit: make(chan struct{}, 1), - zone: "sh1", - subsetSize: 50, - } - - if len(r.filter(backs)) != 50 { - t.Fatalf("backends length must be 50") - } -} - -func Test_FilterNone(t *testing.T) { - var backs []*naming.Instance - for i := 0; i < 100; i++ { - backs = append(backs, &naming.Instance{ - Zone: "sh1", - Env: "prod", - AppID: "2233", - Metadata: map[string]string{naming.MetaCluster: "c1"}, - Hostname: fmt.Sprintf("linux-%d", i), - Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, - LastTs: time.Now().Unix(), - }) - } - r := &Resolver{ - quit: make(chan struct{}, 1), - zone: "sh1", - subsetSize: 50, - clusters: map[string]struct{}{"c2": struct{}{}}, - } - - if len(r.filter(backs)) != 0 { - t.Fatalf("backends length must be 0") - } -} - -func Test_FilterSome(t *testing.T) { - var backs []*naming.Instance - for i := 0; i < 40; i++ { - backs = append(backs, &naming.Instance{ - Zone: "sh1", - Env: "prod", - AppID: "2233", - Metadata: map[string]string{naming.MetaCluster: "c1"}, - Hostname: fmt.Sprintf("linux-%d", i), - Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, - LastTs: time.Now().Unix(), - }) - } - for i := 50; i < 150; i++ { - backs = append(backs, &naming.Instance{ - Zone: "sh1", - Env: "prod", - AppID: "2233", - Metadata: map[string]string{naming.MetaCluster: "c2"}, - Hostname: fmt.Sprintf("linux-%d", i), - Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, - LastTs: time.Now().Unix(), - }) - } - r := &Resolver{ - quit: make(chan struct{}, 1), - zone: "sh1", - subsetSize: 50, - clusters: map[string]struct{}{"c2": struct{}{}}, - } - if len(r.filter(backs)) != 50 { - t.Fatalf("backends length must be 0") - } - - r2 := &Resolver{ - quit: make(chan struct{}, 1), - zone: "sh1", - subsetSize: 50, - clusters: map[string]struct{}{"c1": struct{}{}}, - } - if len(r2.filter(backs)) != 40 { - t.Fatalf("backends length must be 0") - } -} diff --git a/pkg/net/rpc/warden/resolver/test/mockdiscovery.go b/pkg/net/rpc/warden/resolver/test/mockdiscovery.go index 8efad613d..edf6fd75e 100644 --- a/pkg/net/rpc/warden/resolver/test/mockdiscovery.go +++ b/pkg/net/rpc/warden/resolver/test/mockdiscovery.go @@ -12,7 +12,7 @@ type mockDiscoveryBuilder struct { watchch map[string][]*mockDiscoveryResolver } -func (mb *mockDiscoveryBuilder) Build(id string) naming.Resolver { +func (mb *mockDiscoveryBuilder) Build(id string, opts ...naming.BuildOpt) naming.Resolver { mr := &mockDiscoveryResolver{ d: mb, watchch: make(chan struct{}, 1),