diff --git a/pkg/net/rpc/warden/CHANGELOG.md b/pkg/net/rpc/warden/CHANGELOG.md index 79bec5a3e..5c453bb02 100644 --- a/pkg/net/rpc/warden/CHANGELOG.md +++ b/pkg/net/rpc/warden/CHANGELOG.md @@ -1,5 +1,8 @@ ### net/rpc/warden +##### Version 1.1.18 +1. 修复resolver过滤导致的子集bug + ##### Version 1.1.17 1. 移除 bbr feature flag,默认开启自适应限流 diff --git a/pkg/net/rpc/warden/resolver/resolver.go b/pkg/net/rpc/warden/resolver/resolver.go index 91c05af43..06b0bf8e5 100644 --- a/pkg/net/rpc/warden/resolver/resolver.go +++ b/pkg/net/rpc/warden/resolver/resolver.go @@ -135,14 +135,41 @@ func (r *Resolver) updateproc() { instances = append(instances, value...) } } - if r.subsetSize > 0 && len(instances) > 0 { - instances = r.subset(instances, env.Hostname, r.subsetSize) - } - if len(instances) > 0 { - r.newAddress(instances) + r.newAddress(r.filter(instances)) + } + } +} + +func (r *Resolver) filter(backends []*naming.Instance) (instances []*naming.Instance) { + for _, ins := range backends { + //如果r.clusters的长度大于0说明需要进行集群选择 + if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok && len(r.clusters) > 0 { + continue + } + var addr string + for _, a := range ins.Addrs { + u, err := url.Parse(a) + if err == nil && u.Scheme == Scheme { + addr = u.Host } } + if addr == "" { + fmt.Fprintf(os.Stderr, "resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs) + log.Warn("resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs) + continue + } + instances = append(instances, ins) + } + if len(instances) == 0 { + for _, bkend := range backends { + log.Warn("resolver: backends(%d) invalid instance:%v", len(backends), bkend) + } + return + } + if r.subsetSize > 0 { + instances = r.subset(instances, env.Hostname, r.subsetSize) } + return } func (r *Resolver) subset(backends []*naming.Instance, clientID string, size int64) []*naming.Instance { @@ -172,12 +199,6 @@ func (r *Resolver) newAddress(instances []*naming.Instance) { } addrs := make([]resolver.Address, 0, len(instances)) for _, ins := range instances { - if len(r.clusters) > 0 { - if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok { - continue - } - } - var weight int64 if weight, _ = strconv.ParseInt(ins.Metadata[naming.MetaWeight], 10, 64); weight <= 0 { weight = 10 @@ -189,11 +210,6 @@ func (r *Resolver) newAddress(instances []*naming.Instance) { rpc = u.Host } } - if rpc == "" { - fmt.Fprintf(os.Stderr, "warden/resolver: app(%s,%s) no valid grpc address(%v) found!", ins.AppID, ins.Hostname, ins.Addrs) - log.Warn("warden/resolver: invalid rpc address(%s,%s,%v) found!", ins.AppID, ins.Hostname, ins.Addrs) - continue - } addr := resolver.Address{ Addr: rpc, Type: resolver.Backend, @@ -202,5 +218,6 @@ func (r *Resolver) newAddress(instances []*naming.Instance) { } addrs = append(addrs, addr) } + log.Info("resolver: finally get %d instances", len(addrs)) r.cc.NewAddress(addrs) } diff --git a/pkg/net/rpc/warden/resolver/resolver_test.go b/pkg/net/rpc/warden/resolver/resolver_test.go new file mode 100644 index 000000000..54b0998c1 --- /dev/null +++ b/pkg/net/rpc/warden/resolver/resolver_test.go @@ -0,0 +1,125 @@ +package resolver + +import ( + "fmt" + "testing" + "time" + + "github.com/bilibili/kratos/pkg/naming" +) + +func Test_FilterLittle(t *testing.T) { + var backs []*naming.Instance + for i := 0; i < 3; i++ { + backs = append(backs, &naming.Instance{ + Zone: "sh1", + Env: "prod", + AppID: "2233", + Hostname: fmt.Sprintf("linux-%d", i), + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + LastTs: time.Now().Unix(), + }) + } + r := &Resolver{ + quit: make(chan struct{}, 1), + zone: "sh1", + subsetSize: 50, + } + + if len(r.filter(backs)) != 3 { + t.Fatalf("backends length must be 3") + } +} + +func Test_FilterBig(t *testing.T) { + var backs []*naming.Instance + for i := 0; i < 100; i++ { + backs = append(backs, &naming.Instance{ + Zone: "sh1", + Env: "prod", + AppID: "2233", + Hostname: fmt.Sprintf("linux-%d", i), + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + LastTs: time.Now().Unix(), + }) + } + r := &Resolver{ + quit: make(chan struct{}, 1), + zone: "sh1", + subsetSize: 50, + } + + if len(r.filter(backs)) != 50 { + t.Fatalf("backends length must be 50") + } +} + +func Test_FilterNone(t *testing.T) { + var backs []*naming.Instance + for i := 0; i < 100; i++ { + backs = append(backs, &naming.Instance{ + Zone: "sh1", + Env: "prod", + AppID: "2233", + Metadata: map[string]string{naming.MetaCluster: "c1"}, + Hostname: fmt.Sprintf("linux-%d", i), + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + LastTs: time.Now().Unix(), + }) + } + r := &Resolver{ + quit: make(chan struct{}, 1), + zone: "sh1", + subsetSize: 50, + clusters: map[string]struct{}{"c2": struct{}{}}, + } + + if len(r.filter(backs)) != 0 { + t.Fatalf("backends length must be 0") + } +} + +func Test_FilterSome(t *testing.T) { + var backs []*naming.Instance + for i := 0; i < 40; i++ { + backs = append(backs, &naming.Instance{ + Zone: "sh1", + Env: "prod", + AppID: "2233", + Metadata: map[string]string{naming.MetaCluster: "c1"}, + Hostname: fmt.Sprintf("linux-%d", i), + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + LastTs: time.Now().Unix(), + }) + } + for i := 50; i < 150; i++ { + backs = append(backs, &naming.Instance{ + Zone: "sh1", + Env: "prod", + AppID: "2233", + Metadata: map[string]string{naming.MetaCluster: "c2"}, + Hostname: fmt.Sprintf("linux-%d", i), + Addrs: []string{fmt.Sprintf("grpc://127.0.0.%d:9000", i)}, + LastTs: time.Now().Unix(), + }) + } + r := &Resolver{ + quit: make(chan struct{}, 1), + zone: "sh1", + subsetSize: 50, + clusters: map[string]struct{}{"c2": struct{}{}}, + } + if len(r.filter(backs)) != 50 { + t.Fatalf("backends length must be 0") + } + + r2 := &Resolver{ + quit: make(chan struct{}, 1), + zone: "sh1", + subsetSize: 50, + clusters: map[string]struct{}{"c1": struct{}{}}, + } + if len(r2.filter(backs)) != 40 { + t.Fatalf("backends length must be 0") + } +}