From 0dc30dde727813641abe23f513f065b2fa1d24a7 Mon Sep 17 00:00:00 2001 From: Tony Chen Date: Thu, 14 Apr 2022 13:48:20 +0800 Subject: [PATCH] feat(selector): add node scheme (#1932) * add node scheme --- selector/default_node.go | 11 +++++++++-- selector/filter/version_test.go | 2 ++ selector/node/direct/direct_test.go | 2 ++ selector/node/ewma/node_test.go | 3 +++ selector/p2c/p2c_test.go | 2 ++ selector/random/random_test.go | 2 ++ selector/selector.go | 3 +++ selector/selector_test.go | 2 ++ selector/wrr/wrr_test.go | 2 ++ transport/grpc/balancer.go | 2 +- transport/http/resolver.go | 2 +- 11 files changed, 29 insertions(+), 4 deletions(-) diff --git a/selector/default_node.go b/selector/default_node.go index b68899817..186629e78 100644 --- a/selector/default_node.go +++ b/selector/default_node.go @@ -8,6 +8,7 @@ import ( // DefaultNode is selector node type DefaultNode struct { + scheme string addr string weight *int64 version string @@ -15,6 +16,11 @@ type DefaultNode struct { metadata map[string]string } +// Scheme is node scheme +func (n *DefaultNode) Scheme() string { + return n.scheme +} + // Address is node address func (n *DefaultNode) Address() string { return n.addr @@ -41,9 +47,10 @@ func (n *DefaultNode) Metadata() map[string]string { } // NewNode new node -func NewNode(addr string, ins *registry.ServiceInstance) Node { +func NewNode(scheme, addr string, ins *registry.ServiceInstance) Node { n := &DefaultNode{ - addr: addr, + scheme: scheme, + addr: addr, } if ins != nil { n.name = ins.Name diff --git a/selector/filter/version_test.go b/selector/filter/version_test.go index 3845ec7c3..8c0895e3f 100644 --- a/selector/filter/version_test.go +++ b/selector/filter/version_test.go @@ -13,6 +13,7 @@ func TestVersion(t *testing.T) { f := Version("v2.0.0") var nodes []selector.Node nodes = append(nodes, selector.NewNode( + "http", "127.0.0.1:9090", ®istry.ServiceInstance{ ID: "127.0.0.1:9090", @@ -22,6 +23,7 @@ func TestVersion(t *testing.T) { })) nodes = append(nodes, selector.NewNode( + "http", "127.0.0.2:9090", ®istry.ServiceInstance{ ID: "127.0.0.2:9090", diff --git a/selector/node/direct/direct_test.go b/selector/node/direct/direct_test.go index d498c55c9..5da0007a6 100644 --- a/selector/node/direct/direct_test.go +++ b/selector/node/direct/direct_test.go @@ -13,6 +13,7 @@ import ( func TestDirect(t *testing.T) { b := &Builder{} wn := b.Build(selector.NewNode( + "http", "127.0.0.1:9090", ®istry.ServiceInstance{ ID: "127.0.0.1:9090", @@ -42,6 +43,7 @@ func TestDirect(t *testing.T) { func TestDirectDefaultWeight(t *testing.T) { b := &Builder{} wn := b.Build(selector.NewNode( + "http", "127.0.0.1:9090", ®istry.ServiceInstance{ ID: "127.0.0.1:9090", diff --git a/selector/node/ewma/node_test.go b/selector/node/ewma/node_test.go index 0da13fd4d..feac09f28 100644 --- a/selector/node/ewma/node_test.go +++ b/selector/node/ewma/node_test.go @@ -13,6 +13,7 @@ import ( func TestDirect(t *testing.T) { b := &Builder{} wn := b.Build(selector.NewNode( + "http", "127.0.0.1:9090", ®istry.ServiceInstance{ ID: "127.0.0.1:9090", @@ -53,6 +54,7 @@ func TestDirect(t *testing.T) { func TestDirectError(t *testing.T) { b := &Builder{} wn := b.Build(selector.NewNode( + "http", "127.0.0.1:9090", ®istry.ServiceInstance{ ID: "127.0.0.1:9090", @@ -89,6 +91,7 @@ func TestDirectErrorHandler(t *testing.T) { }, } wn := b.Build(selector.NewNode( + "http", "127.0.0.1:9090", ®istry.ServiceInstance{ ID: "127.0.0.1:9090", diff --git a/selector/p2c/p2c_test.go b/selector/p2c/p2c_test.go index 7065d499a..03d549e2c 100644 --- a/selector/p2c/p2c_test.go +++ b/selector/p2c/p2c_test.go @@ -21,6 +21,7 @@ func TestWrr3(t *testing.T) { for i := 0; i < 3; i++ { addr := fmt.Sprintf("127.0.0.%d:8080", i) nodes = append(nodes, selector.NewNode( + "http", addr, ®istry.ServiceInstance{ ID: addr, @@ -96,6 +97,7 @@ func TestOne(t *testing.T) { for i := 0; i < 1; i++ { addr := fmt.Sprintf("127.0.0.%d:8080", i) nodes = append(nodes, selector.NewNode( + "http", addr, ®istry.ServiceInstance{ ID: addr, diff --git a/selector/random/random_test.go b/selector/random/random_test.go index d8857411c..7e69f3ee6 100644 --- a/selector/random/random_test.go +++ b/selector/random/random_test.go @@ -13,6 +13,7 @@ func TestWrr(t *testing.T) { random := New(WithFilter(filter.Version("v2.0.0"))) var nodes []selector.Node nodes = append(nodes, selector.NewNode( + "http", "127.0.0.1:8080", ®istry.ServiceInstance{ ID: "127.0.0.1:8080", @@ -20,6 +21,7 @@ func TestWrr(t *testing.T) { Metadata: map[string]string{"weight": "10"}, })) nodes = append(nodes, selector.NewNode( + "http", "127.0.0.1:9090", ®istry.ServiceInstance{ ID: "127.0.0.1:9090", diff --git a/selector/selector.go b/selector/selector.go index ee423c9a6..79ecdf786 100644 --- a/selector/selector.go +++ b/selector/selector.go @@ -31,6 +31,9 @@ type Builder interface { // Node is node interface. type Node interface { + // Scheme is service node scheme + Scheme() string + // Address is the unique address under the same service Address() string diff --git a/selector/selector_test.go b/selector/selector_test.go index 9d624f3b5..25e70acde 100644 --- a/selector/selector_test.go +++ b/selector/selector_test.go @@ -89,6 +89,7 @@ func TestDefault(t *testing.T) { selector := builder.Build() var nodes []Node nodes = append(nodes, NewNode( + "http", "127.0.0.1:8080", ®istry.ServiceInstance{ ID: "127.0.0.1:8080", @@ -98,6 +99,7 @@ func TestDefault(t *testing.T) { Metadata: map[string]string{"weight": "10"}, })) nodes = append(nodes, NewNode( + "http", "127.0.0.1:9090", ®istry.ServiceInstance{ ID: "127.0.0.1:9090", diff --git a/selector/wrr/wrr_test.go b/selector/wrr/wrr_test.go index ed0693315..e3e96ed45 100644 --- a/selector/wrr/wrr_test.go +++ b/selector/wrr/wrr_test.go @@ -14,6 +14,7 @@ func TestWrr(t *testing.T) { wrr := New(WithFilter(filter.Version("v2.0.0"))) var nodes []selector.Node nodes = append(nodes, selector.NewNode( + "http", "127.0.0.1:8080", ®istry.ServiceInstance{ ID: "127.0.0.1:8080", @@ -21,6 +22,7 @@ func TestWrr(t *testing.T) { Metadata: map[string]string{"weight": "10"}, })) nodes = append(nodes, selector.NewNode( + "http", "127.0.0.1:9090", ®istry.ServiceInstance{ ID: "127.0.0.1:9090", diff --git a/transport/grpc/balancer.go b/transport/grpc/balancer.go index 0181792df..2a27c05ad 100644 --- a/transport/grpc/balancer.go +++ b/transport/grpc/balancer.go @@ -58,7 +58,7 @@ func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker { for conn, info := range info.ReadySCs { ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance) nodes = append(nodes, &grpcNode{ - Node: selector.NewNode(info.Address.Addr, ins), + Node: selector.NewNode("grpc", info.Address.Addr, ins), subConn: conn, }) } diff --git a/transport/http/resolver.go b/transport/http/resolver.go index a0a1e979e..6a264cb33 100644 --- a/transport/http/resolver.go +++ b/transport/http/resolver.go @@ -122,7 +122,7 @@ func (r *resolver) update(services []*registry.ServiceInstance) bool { if ept == "" { continue } - nodes = append(nodes, selector.NewNode(ept, ins)) + nodes = append(nodes, selector.NewNode("http", ept, ins)) } if len(nodes) == 0 { r.logger.Warnf("[http resolver]Zero endpoint found,refused to write,set: %s ins: %v", r.target.Endpoint, nodes)