Merge pull request #196 from bilibili/resolver/fix_subset

fix resolver subset bug
pull/200/head
Tony 5 years ago committed by GitHub
commit b66679132a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      pkg/net/rpc/warden/CHANGELOG.md
  2. 49
      pkg/net/rpc/warden/resolver/resolver.go
  3. 125
      pkg/net/rpc/warden/resolver/resolver_test.go

@ -1,5 +1,8 @@
### net/rpc/warden
##### Version 1.1.18
1. 修复resolver过滤导致的子集bug
##### Version 1.1.17
1. 移除 bbr feature flag,默认开启自适应限流

@ -135,14 +135,41 @@ func (r *Resolver) updateproc() {
instances = append(instances, value...)
}
}
if r.subsetSize > 0 && len(instances) > 0 {
instances = r.subset(instances, env.Hostname, r.subsetSize)
}
if len(instances) > 0 {
r.newAddress(instances)
r.newAddress(r.filter(instances))
}
}
}
func (r *Resolver) filter(backends []*naming.Instance) (instances []*naming.Instance) {
for _, ins := range backends {
//如果r.clusters的长度大于0说明需要进行集群选择
if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok && len(r.clusters) > 0 {
continue
}
var addr string
for _, a := range ins.Addrs {
u, err := url.Parse(a)
if err == nil && u.Scheme == Scheme {
addr = u.Host
}
}
if addr == "" {
fmt.Fprintf(os.Stderr, "resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
log.Warn("resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
continue
}
instances = append(instances, ins)
}
if len(instances) == 0 {
for _, bkend := range backends {
log.Warn("resolver: backends(%d) invalid instance:%v", len(backends), bkend)
}
return
}
if r.subsetSize > 0 {
instances = r.subset(instances, env.Hostname, r.subsetSize)
}
return
}
func (r *Resolver) subset(backends []*naming.Instance, clientID string, size int64) []*naming.Instance {
@ -172,12 +199,6 @@ func (r *Resolver) newAddress(instances []*naming.Instance) {
}
addrs := make([]resolver.Address, 0, len(instances))
for _, ins := range instances {
if len(r.clusters) > 0 {
if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok {
continue
}
}
var weight int64
if weight, _ = strconv.ParseInt(ins.Metadata[naming.MetaWeight], 10, 64); weight <= 0 {
weight = 10
@ -189,11 +210,6 @@ func (r *Resolver) newAddress(instances []*naming.Instance) {
rpc = u.Host
}
}
if rpc == "" {
fmt.Fprintf(os.Stderr, "warden/resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
log.Warn("warden/resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs)
continue
}
addr := resolver.Address{
Addr: rpc,
Type: resolver.Backend,
@ -202,5 +218,6 @@ func (r *Resolver) newAddress(instances []*naming.Instance) {
}
addrs = append(addrs, addr)
}
log.Info("resolver: finally get %d instances", len(addrs))
r.cc.NewAddress(addrs)
}

@ -0,0 +1,125 @@
package resolver
import (
"fmt"
"testing"
"time"
"github.com/bilibili/kratos/pkg/naming"
)
func Test_FilterLittle(t *testing.T) {
var backs []*naming.Instance
for i := 0; i < 3; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
r := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
}
if len(r.filter(backs)) != 3 {
t.Fatalf("backends length must be 3")
}
}
func Test_FilterBig(t *testing.T) {
var backs []*naming.Instance
for i := 0; i < 100; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
r := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
}
if len(r.filter(backs)) != 50 {
t.Fatalf("backends length must be 50")
}
}
func Test_FilterNone(t *testing.T) {
var backs []*naming.Instance
for i := 0; i < 100; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Metadata: map[string]string{naming.MetaCluster: "c1"},
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
r := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
clusters: map[string]struct{}{"c2": struct{}{}},
}
if len(r.filter(backs)) != 0 {
t.Fatalf("backends length must be 0")
}
}
func Test_FilterSome(t *testing.T) {
var backs []*naming.Instance
for i := 0; i < 40; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Metadata: map[string]string{naming.MetaCluster: "c1"},
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
for i := 50; i < 150; i++ {
backs = append(backs, &naming.Instance{
Zone: "sh1",
Env: "prod",
AppID: "2233",
Metadata: map[string]string{naming.MetaCluster: "c2"},
Hostname: fmt.Sprintf("linux-%d", i),
Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)},
LastTs: time.Now().Unix(),
})
}
r := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
clusters: map[string]struct{}{"c2": struct{}{}},
}
if len(r.filter(backs)) != 50 {
t.Fatalf("backends length must be 0")
}
r2 := &Resolver{
quit: make(chan struct{}, 1),
zone: "sh1",
subsetSize: 50,
clusters: map[string]struct{}{"c1": struct{}{}},
}
if len(r2.filter(backs)) != 40 {
t.Fatalf("backends length must be 0")
}
}
Loading…
Cancel
Save