feat(selector): add node scheme (#1932)

* add node scheme
status-code-override
Tony Chen 3 years ago committed by chenzhihui
parent 13e5fd26af
commit 0dc30dde72
  1. 9
      selector/default_node.go
  2. 2
      selector/filter/version_test.go
  3. 2
      selector/node/direct/direct_test.go
  4. 3
      selector/node/ewma/node_test.go
  5. 2
      selector/p2c/p2c_test.go
  6. 2
      selector/random/random_test.go
  7. 3
      selector/selector.go
  8. 2
      selector/selector_test.go
  9. 2
      selector/wrr/wrr_test.go
  10. 2
      transport/grpc/balancer.go
  11. 2
      transport/http/resolver.go

@ -8,6 +8,7 @@ import (
// DefaultNode is selector node // DefaultNode is selector node
type DefaultNode struct { type DefaultNode struct {
scheme string
addr string addr string
weight *int64 weight *int64
version string version string
@ -15,6 +16,11 @@ type DefaultNode struct {
metadata map[string]string metadata map[string]string
} }
// Scheme is node scheme
func (n *DefaultNode) Scheme() string {
return n.scheme
}
// Address is node address // Address is node address
func (n *DefaultNode) Address() string { func (n *DefaultNode) Address() string {
return n.addr return n.addr
@ -41,8 +47,9 @@ func (n *DefaultNode) Metadata() map[string]string {
} }
// NewNode new node // NewNode new node
func NewNode(addr string, ins *registry.ServiceInstance) Node { func NewNode(scheme, addr string, ins *registry.ServiceInstance) Node {
n := &DefaultNode{ n := &DefaultNode{
scheme: scheme,
addr: addr, addr: addr,
} }
if ins != nil { if ins != nil {

@ -13,6 +13,7 @@ func TestVersion(t *testing.T) {
f := Version("v2.0.0") f := Version("v2.0.0")
var nodes []selector.Node var nodes []selector.Node
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
@ -22,6 +23,7 @@ func TestVersion(t *testing.T) {
})) }))
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.2:9090", "127.0.0.2:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.2:9090", ID: "127.0.0.2:9090",

@ -13,6 +13,7 @@ import (
func TestDirect(t *testing.T) { func TestDirect(t *testing.T) {
b := &Builder{} b := &Builder{}
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
@ -42,6 +43,7 @@ func TestDirect(t *testing.T) {
func TestDirectDefaultWeight(t *testing.T) { func TestDirectDefaultWeight(t *testing.T) {
b := &Builder{} b := &Builder{}
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",

@ -13,6 +13,7 @@ import (
func TestDirect(t *testing.T) { func TestDirect(t *testing.T) {
b := &Builder{} b := &Builder{}
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
@ -53,6 +54,7 @@ func TestDirect(t *testing.T) {
func TestDirectError(t *testing.T) { func TestDirectError(t *testing.T) {
b := &Builder{} b := &Builder{}
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",
@ -89,6 +91,7 @@ func TestDirectErrorHandler(t *testing.T) {
}, },
} }
wn := b.Build(selector.NewNode( wn := b.Build(selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",

@ -21,6 +21,7 @@ func TestWrr3(t *testing.T) {
for i := 0; i < 3; i++ { for i := 0; i < 3; i++ {
addr := fmt.Sprintf("127.0.0.%d:8080", i) addr := fmt.Sprintf("127.0.0.%d:8080", i)
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
addr, addr,
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: addr, ID: addr,
@ -96,6 +97,7 @@ func TestOne(t *testing.T) {
for i := 0; i < 1; i++ { for i := 0; i < 1; i++ {
addr := fmt.Sprintf("127.0.0.%d:8080", i) addr := fmt.Sprintf("127.0.0.%d:8080", i)
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
addr, addr,
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: addr, ID: addr,

@ -13,6 +13,7 @@ func TestWrr(t *testing.T) {
random := New(WithFilter(filter.Version("v2.0.0"))) random := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node var nodes []selector.Node
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:8080", "127.0.0.1:8080",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:8080", ID: "127.0.0.1:8080",
@ -20,6 +21,7 @@ func TestWrr(t *testing.T) {
Metadata: map[string]string{"weight": "10"}, Metadata: map[string]string{"weight": "10"},
})) }))
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",

@ -31,6 +31,9 @@ type Builder interface {
// Node is node interface. // Node is node interface.
type Node interface { type Node interface {
// Scheme is service node scheme
Scheme() string
// Address is the unique address under the same service // Address is the unique address under the same service
Address() string Address() string

@ -89,6 +89,7 @@ func TestDefault(t *testing.T) {
selector := builder.Build() selector := builder.Build()
var nodes []Node var nodes []Node
nodes = append(nodes, NewNode( nodes = append(nodes, NewNode(
"http",
"127.0.0.1:8080", "127.0.0.1:8080",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:8080", ID: "127.0.0.1:8080",
@ -98,6 +99,7 @@ func TestDefault(t *testing.T) {
Metadata: map[string]string{"weight": "10"}, Metadata: map[string]string{"weight": "10"},
})) }))
nodes = append(nodes, NewNode( nodes = append(nodes, NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",

@ -14,6 +14,7 @@ func TestWrr(t *testing.T) {
wrr := New(WithFilter(filter.Version("v2.0.0"))) wrr := New(WithFilter(filter.Version("v2.0.0")))
var nodes []selector.Node var nodes []selector.Node
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:8080", "127.0.0.1:8080",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:8080", ID: "127.0.0.1:8080",
@ -21,6 +22,7 @@ func TestWrr(t *testing.T) {
Metadata: map[string]string{"weight": "10"}, Metadata: map[string]string{"weight": "10"},
})) }))
nodes = append(nodes, selector.NewNode( nodes = append(nodes, selector.NewNode(
"http",
"127.0.0.1:9090", "127.0.0.1:9090",
&registry.ServiceInstance{ &registry.ServiceInstance{
ID: "127.0.0.1:9090", ID: "127.0.0.1:9090",

@ -58,7 +58,7 @@ func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker {
for conn, info := range info.ReadySCs { for conn, info := range info.ReadySCs {
ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance) ins, _ := info.Address.Attributes.Value("rawServiceInstance").(*registry.ServiceInstance)
nodes = append(nodes, &grpcNode{ nodes = append(nodes, &grpcNode{
Node: selector.NewNode(info.Address.Addr, ins), Node: selector.NewNode("grpc", info.Address.Addr, ins),
subConn: conn, subConn: conn,
}) })
} }

@ -122,7 +122,7 @@ func (r *resolver) update(services []*registry.ServiceInstance) bool {
if ept == "" { if ept == "" {
continue continue
} }
nodes = append(nodes, selector.NewNode(ept, ins)) nodes = append(nodes, selector.NewNode("http", ept, ins))
} }
if len(nodes) == 0 { if len(nodes) == 0 {
r.logger.Warnf("[http resolver]Zero endpoint found,refused to write,set: %s ins: %v", r.target.Endpoint, nodes) r.logger.Warnf("[http resolver]Zero endpoint found,refused to write,set: %s ins: %v", r.target.Endpoint, nodes)

Loading…
Cancel
Save