test : add selector & balancer test (#1577)

* add selector test

* add balancer test

* add node and selector

* add ewma test

* fix datarace

* fix datarace
pull/1580/head
longxboy 3 years ago committed by GitHub
parent 7cd9503b95
commit ae57ae9bde
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 23
      selector/default_node.go
  2. 0
      selector/default_selector.go
  3. 36
      selector/filter/version_test.go
  4. 53
      selector/node/direct/direct_test.go
  5. 96
      selector/node/ewma/node_test.go
  6. 5
      selector/p2c/p2c.go
  7. 92
      selector/p2c/p2c_test.go
  8. 54
      selector/random/random_test.go
  9. 127
      selector/selector_test.go
  10. 52
      selector/wrr/wrr_test.go
  11. 3
      transport/grpc/balancer.go
  12. 3
      transport/http/resolver.go

@ -1,14 +1,13 @@
package node
package selector
import (
"strconv"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
)
// Node is slector node
type Node struct {
// DefaultNode is slector node
type DefaultNode struct {
addr string
weight *int64
version string
@ -17,33 +16,33 @@ type Node struct {
}
// Address is node address
func (n *Node) Address() string {
func (n *DefaultNode) Address() string {
return n.addr
}
// ServiceName is node serviceName
func (n *Node) ServiceName() string {
func (n *DefaultNode) ServiceName() string {
return n.name
}
// InitialWeight is node initialWeight
func (n *Node) InitialWeight() *int64 {
func (n *DefaultNode) InitialWeight() *int64 {
return n.weight
}
// Version is node version
func (n *Node) Version() string {
func (n *DefaultNode) Version() string {
return n.version
}
// Metadata is node metadata
func (n *Node) Metadata() map[string]string {
func (n *DefaultNode) Metadata() map[string]string {
return n.metadata
}
// New node
func New(addr string, ins *registry.ServiceInstance) selector.Node {
n := &Node{
// NewNode new node
func NewNode(addr string, ins *registry.ServiceInstance) Node {
n := &DefaultNode{
addr: addr,
}
if ins != nil {

@ -0,0 +1,36 @@
package filter
import (
"context"
"testing"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
"github.com/stretchr/testify/assert"
)
func TestVersion(t *testing.T) {
f := Version("v2.0.0")
var nodes []selector.Node
nodes = append(nodes, selector.NewNode(
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Name: "helloworld",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:9090"},
}))
nodes = append(nodes, selector.NewNode(
"127.0.0.2:9090",
&registry.ServiceInstance{
ID: "127.0.0.2:9090",
Name: "helloworld",
Version: "v2.0.0",
Endpoints: []string{"http://127.0.0.2:9090"},
}))
n := f(context.Background(), nodes)
assert.Equal(t, 1, len(n))
assert.Equal(t, "127.0.0.2:9090", n[0].Address())
}

@ -0,0 +1,53 @@
package direct
import (
"context"
"testing"
"time"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
"github.com/stretchr/testify/assert"
)
func TestDirect(t *testing.T) {
b := &Builder{}
wn := b.Build(selector.NewNode(
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Name: "helloworld",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:9090"},
Metadata: map[string]string{"weight": "10"},
}))
done := wn.Pick()
assert.NotNil(t, done)
time.Sleep(time.Millisecond * 10)
done(context.Background(), selector.DoneInfo{})
assert.Equal(t, float64(10), wn.Weight())
assert.Greater(t, time.Millisecond*15, wn.PickElapsed())
assert.Less(t, time.Millisecond*5, wn.PickElapsed())
}
func TestDirectDefaultWeight(t *testing.T) {
b := &Builder{}
wn := b.Build(selector.NewNode(
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Name: "helloworld",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:9090"},
}))
done := wn.Pick()
assert.NotNil(t, done)
time.Sleep(time.Millisecond * 10)
done(context.Background(), selector.DoneInfo{})
assert.Equal(t, float64(100), wn.Weight())
assert.Greater(t, time.Millisecond*15, wn.PickElapsed())
assert.Less(t, time.Millisecond*5, wn.PickElapsed())
}

@ -0,0 +1,96 @@
package ewma
import (
"context"
"testing"
"time"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
"github.com/stretchr/testify/assert"
)
func TestDirect(t *testing.T) {
b := &Builder{}
wn := b.Build(selector.NewNode(
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Name: "helloworld",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:9090"},
Metadata: map[string]string{"weight": "10"},
}))
assert.Equal(t, float64(100), wn.Weight())
done := wn.Pick()
assert.NotNil(t, done)
done2 := wn.Pick()
assert.NotNil(t, done2)
time.Sleep(time.Millisecond * 10)
done(context.Background(), selector.DoneInfo{})
assert.Less(t, float64(30000), wn.Weight())
assert.Greater(t, float64(60000), wn.Weight())
assert.Greater(t, time.Millisecond*15, wn.PickElapsed())
assert.Less(t, time.Millisecond*5, wn.PickElapsed())
}
func TestDirectError(t *testing.T) {
b := &Builder{}
wn := b.Build(selector.NewNode(
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Name: "helloworld",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:9090"},
Metadata: map[string]string{"weight": "10"},
}))
for i := 0; i < 5; i++ {
var err error
if i != 0 {
err = context.DeadlineExceeded
}
done := wn.Pick()
assert.NotNil(t, done)
time.Sleep(time.Millisecond * 20)
done(context.Background(), selector.DoneInfo{Err: err})
}
assert.Less(t, float64(30000), wn.Weight())
assert.Greater(t, float64(60000), wn.Weight())
}
func TestDirectErrorHandler(t *testing.T) {
b := &Builder{
ErrHandler: func(err error) bool {
return err != nil
},
}
wn := b.Build(selector.NewNode(
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Name: "helloworld",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:9090"},
Metadata: map[string]string{"weight": "10"},
}))
for i := 0; i < 5; i++ {
var err error
if i != 0 {
err = context.DeadlineExceeded
}
done := wn.Pick()
assert.NotNil(t, done)
time.Sleep(time.Millisecond * 20)
done(context.Background(), selector.DoneInfo{Err: err})
}
assert.Less(t, float64(30000), wn.Weight())
assert.Greater(t, float64(60000), wn.Weight())
}

@ -46,8 +46,9 @@ type Balancer struct {
// choose two distinct nodes.
func (s *Balancer) prePick(nodes []selector.WeightedNode) (nodeA selector.WeightedNode, nodeB selector.WeightedNode) {
a := s.r.Intn(len(nodes))
b := s.r.Intn(len(nodes) - 1)
source := rand.NewSource(time.Now().UnixNano())
a := rand.New(source).Intn(len(nodes))
b := rand.New(source).Intn(len(nodes) - 1)
if b >= a {
b = b + 1
}

@ -0,0 +1,92 @@
package p2c
import (
"context"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"testing"
"time"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/filter"
"github.com/stretchr/testify/assert"
)
func TestWrr3(t *testing.T) {
p2c := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node
for i := 0; i < 3; i++ {
addr := fmt.Sprintf("127.0.0.%d:8080", i)
nodes = append(nodes, selector.NewNode(
addr,
&registry.ServiceInstance{
ID: addr,
Version: "v2.0.0",
Metadata: map[string]string{"weight": "10"},
}))
}
p2c.Apply(nodes)
var count1, count2, count3 int64
group := &sync.WaitGroup{}
var lk sync.Mutex
for i := 0; i < 9000; i++ {
group.Add(1)
go func() {
defer group.Done()
lk.Lock()
d := time.Duration(rand.Intn(500)) * time.Millisecond
lk.Unlock()
time.Sleep(d)
n, done, err := p2c.Select(context.Background())
assert.Nil(t, err)
assert.NotNil(t, done)
assert.NotNil(t, n)
time.Sleep(time.Millisecond * 10)
done(context.Background(), selector.DoneInfo{})
if n.Address() == "127.0.0.0:8080" {
atomic.AddInt64(&count1, 1)
} else if n.Address() == "127.0.0.1:8080" {
atomic.AddInt64(&count2, 1)
} else if n.Address() == "127.0.0.2:8080" {
atomic.AddInt64(&count3, 1)
}
}()
}
group.Wait()
assert.Greater(t, count1, int64(2500))
assert.Less(t, count1, int64(3500))
assert.Greater(t, count2, int64(2500))
assert.Less(t, count2, int64(3500))
assert.Greater(t, count3, int64(2500))
assert.Less(t, count3, int64(3500))
}
func TestEmpty(t *testing.T) {
b := &Balancer{}
_, _, err := b.Pick(context.Background(), []selector.WeightedNode{})
assert.NotNil(t, err)
}
func TestOne(t *testing.T) {
p2c := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node
for i := 0; i < 1; i++ {
addr := fmt.Sprintf("127.0.0.%d:8080", i)
nodes = append(nodes, selector.NewNode(
addr,
&registry.ServiceInstance{
ID: addr,
Version: "v2.0.0",
Metadata: map[string]string{"weight": "10"},
}))
}
p2c.Apply(nodes)
n, done, err := p2c.Select(context.Background())
assert.Nil(t, err)
assert.NotNil(t, done)
assert.NotNil(t, n)
assert.Equal(t, "127.0.0.0:8080", n.Address())
}

@ -0,0 +1,54 @@
package random
import (
"context"
"testing"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/filter"
"github.com/stretchr/testify/assert"
)
func TestWrr(t *testing.T) {
random := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node
nodes = append(nodes, selector.NewNode(
"127.0.0.1:8080",
&registry.ServiceInstance{
ID: "127.0.0.1:8080",
Version: "v2.0.0",
Metadata: map[string]string{"weight": "10"},
}))
nodes = append(nodes, selector.NewNode(
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Version: "v2.0.0",
Metadata: map[string]string{"weight": "20"},
}))
random.Apply(nodes)
var count1, count2 int
for i := 0; i < 200; i++ {
n, done, err := random.Select(context.Background())
assert.Nil(t, err)
assert.NotNil(t, done)
assert.NotNil(t, n)
done(context.Background(), selector.DoneInfo{})
if n.Address() == "127.0.0.1:8080" {
count1++
} else if n.Address() == "127.0.0.1:9090" {
count2++
}
}
assert.Greater(t, count1, 80)
assert.Less(t, count1, 120)
assert.Greater(t, count2, 80)
assert.Less(t, count2, 120)
}
func TestEmpty(t *testing.T) {
b := &Balancer{}
_, _, err := b.Pick(context.Background(), []selector.WeightedNode{})
assert.NotNil(t, err)
}

@ -0,0 +1,127 @@
package selector
import (
"context"
"math/rand"
"sync/atomic"
"testing"
"time"
"github.com/go-kratos/kratos/v2/registry"
"github.com/stretchr/testify/assert"
)
type mockWeightedNode struct {
Node
lastPick int64
}
// Weight is the runtime calculated weight
func (n *mockWeightedNode) Weight() float64 {
if n.InitialWeight() != nil {
return float64(*n.InitialWeight())
}
return 100
}
// Pick the node
func (n *mockWeightedNode) Pick() DoneFunc {
now := time.Now().UnixNano()
atomic.StoreInt64(&n.lastPick, now)
return func(ctx context.Context, di DoneInfo) {}
}
// PickElapsed is time elapsed since the latest pick
func (n *mockWeightedNode) PickElapsed() time.Duration {
return time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&n.lastPick))
}
type mockWeightedNodeBuilder struct{}
func (b *mockWeightedNodeBuilder) Build(n Node) WeightedNode {
return &mockWeightedNode{Node: n}
}
func mockFilter(version string) Filter {
return func(_ context.Context, nodes []Node) []Node {
filters := make([]Node, 0, len(nodes))
for _, n := range nodes {
if n.Version() == version {
filters = append(filters, n)
}
}
return filters
}
}
type mockBalancerBuilder struct{}
func (b *mockBalancerBuilder) Build() Balancer {
return &mockBalancer{}
}
type mockBalancer struct{}
func (b *mockBalancer) Pick(ctx context.Context, nodes []WeightedNode) (selected WeightedNode, done DoneFunc, err error) {
if len(nodes) == 0 {
err = ErrNoAvailable
return
}
cur := rand.Intn(len(nodes))
selected = nodes[cur]
done = selected.Pick()
return
}
func TestDefault(t *testing.T) {
builder := DefaultBuilder{
Node: &mockWeightedNodeBuilder{},
Filters: []Filter{mockFilter("v2.0.0")},
Balancer: &mockBalancerBuilder{},
}
selector := builder.Build()
var nodes []Node
nodes = append(nodes, NewNode(
"127.0.0.1:8080",
&registry.ServiceInstance{
ID: "127.0.0.1:8080",
Name: "helloworld",
Version: "v2.0.0",
Endpoints: []string{"http://127.0.0.1:8080"},
Metadata: map[string]string{"weight": "10"},
}))
nodes = append(nodes, NewNode(
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Name: "helloworld",
Version: "v1.0.0",
Endpoints: []string{"http://127.0.0.1:9090"},
Metadata: map[string]string{"weight": "10"},
}))
selector.Apply(nodes)
n, done, err := selector.Select(context.Background(), WithFilter(mockFilter("v2.0.0")))
assert.Nil(t, err)
assert.NotNil(t, n)
assert.NotNil(t, done)
assert.Equal(t, "v2.0.0", n.Version())
assert.NotNil(t, n.Address())
assert.Equal(t, int64(10), *n.InitialWeight())
assert.NotNil(t, n.Metadata())
assert.Equal(t, "helloworld", n.ServiceName())
done(context.Background(), DoneInfo{})
// no v3.0.0 instance
n, done, err = selector.Select(context.Background(), WithFilter(mockFilter("v3.0.0")))
assert.Equal(t, ErrNoAvailable, err)
assert.Nil(t, done)
assert.Nil(t, n)
// apply zero instance
selector.Apply([]Node{})
n, done, err = selector.Select(context.Background(), WithFilter(mockFilter("v2.0.0")))
assert.Equal(t, ErrNoAvailable, err)
assert.Nil(t, done)
assert.Nil(t, n)
}

@ -0,0 +1,52 @@
package wrr
import (
"context"
"testing"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/filter"
"github.com/stretchr/testify/assert"
)
func TestWrr(t *testing.T) {
wrr := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node
nodes = append(nodes, selector.NewNode(
"127.0.0.1:8080",
&registry.ServiceInstance{
ID: "127.0.0.1:8080",
Version: "v2.0.0",
Metadata: map[string]string{"weight": "10"},
}))
nodes = append(nodes, selector.NewNode(
"127.0.0.1:9090",
&registry.ServiceInstance{
ID: "127.0.0.1:9090",
Version: "v2.0.0",
Metadata: map[string]string{"weight": "20"},
}))
wrr.Apply(nodes)
var count1, count2 int
for i := 0; i < 90; i++ {
n, done, err := wrr.Select(context.Background())
assert.Nil(t, err)
assert.NotNil(t, done)
assert.NotNil(t, n)
done(context.Background(), selector.DoneInfo{})
if n.Address() == "127.0.0.1:8080" {
count1++
} else if n.Address() == "127.0.0.1:9090" {
count2++
}
}
assert.Equal(t, 30, count1)
assert.Equal(t, 60, count2)
}
func TestEmpty(t *testing.T) {
b := &Balancer{}
_, _, err := b.Pick(context.Background(), []selector.WeightedNode{})
assert.NotNil(t, err)
}

@ -5,7 +5,6 @@ import (
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/node"
"github.com/go-kratos/kratos/v2/selector/p2c"
"github.com/go-kratos/kratos/v2/selector/random"
"github.com/go-kratos/kratos/v2/selector/wrr"
@ -59,7 +58,7 @@ func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker {
subConns[info.Address.Addr] = conn
ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance)
nodes = append(nodes, node.New(info.Address.Addr, ins))
nodes = append(nodes, selector.NewNode(info.Address.Addr, ins))
}
p := &Picker{
selector: b.builder.Build(),

@ -11,7 +11,6 @@ import (
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/node"
)
// Target is resolver target
@ -123,7 +122,7 @@ func (r *resolver) update(services []*registry.ServiceInstance) bool {
if ept == "" {
continue
}
nodes = append(nodes, node.New(ept, ins))
nodes = append(nodes, selector.NewNode(ept, ins))
}
if len(nodes) == 0 {
r.logger.Warnf("[http resovler]Zero endpoint found,refused to write,ser: %s ins: %v", r.target.Endpoint, nodes)

Loading…
Cancel
Save