From 86dec76aa354ff16e775bb69fb9851bafc20cfb1 Mon Sep 17 00:00:00 2001 From: longxboy Date: Mon, 18 Oct 2021 19:26:00 +0800 Subject: [PATCH] fix global selector bug (#1564) --- selector/balancer.go | 5 +++++ selector/default.go | 16 ++++++++++++++++ selector/p2c/p2c.go | 33 ++++++++++++++++++++++----------- selector/random/random.go | 34 +++++++++++++++++++++++----------- selector/selector.go | 5 +++++ selector/wrr/wrr.go | 34 ++++++++++++++++++++++------------ transport/grpc/balancer.go | 14 +++++++------- 7 files changed, 100 insertions(+), 41 deletions(-) diff --git a/selector/balancer.go b/selector/balancer.go index 161072bd9..9e5accbdc 100644 --- a/selector/balancer.go +++ b/selector/balancer.go @@ -10,6 +10,11 @@ type Balancer interface { Pick(ctx context.Context, nodes []WeightedNode) (selected WeightedNode, done DoneFunc, err error) } +// BalancerBuilder build balancer +type BalancerBuilder interface { + Build() Balancer +} + // WeightedNode calculates scheduling weight in real time type WeightedNode interface { Node diff --git a/selector/default.go b/selector/default.go index 25698e718..0e08d615e 100644 --- a/selector/default.go +++ b/selector/default.go @@ -52,3 +52,19 @@ func (d *Default) Apply(nodes []Node) { d.weightedNodes = weightedNodes d.lk.Unlock() } + +// DefaultBuilder is de +type DefaultBuilder struct { + Node WeightedNodeBuilder + Balancer BalancerBuilder + Filters []Filter +} + +// Build create builder +func (db *DefaultBuilder) Build() Selector { + return &Default{ + NodeBuilder: db.Node, + Balancer: db.Balancer.Build(), + Filters: db.Filters, + } +} diff --git a/selector/p2c/p2c.go b/selector/p2c/p2c.go index 817bccd1e..732c0c8b1 100644 --- a/selector/p2c/p2c.go +++ b/selector/p2c/p2c.go @@ -35,17 +35,7 @@ type options struct { // New creates a p2c selector. func New(opts ...Option) selector.Selector { - var option options - for _, opt := range opts { - opt(&option) - } - return &selector.Default{ - NodeBuilder: &ewma.Builder{}, - Balancer: &Balancer{ - r: rand.New(rand.NewSource(time.Now().UnixNano())), - }, - Filters: option.filters, - } + return NewBuilder(opts...).Build() } // Balancer is p2c selector. @@ -92,3 +82,24 @@ func (s *Balancer) Pick(ctx context.Context, nodes []selector.WeightedNode) (sel done := pc.Pick() return pc, done, nil } + +// NewBuilder returns a selector builder with p2c balancer +func NewBuilder(opts ...Option) selector.Builder { + var option options + for _, opt := range opts { + opt(&option) + } + return &selector.DefaultBuilder{ + Filters: option.filters, + Balancer: &Builder{}, + Node: &ewma.Builder{}, + } +} + +// Builder is p2c builder +type Builder struct{} + +// Build creates Balancer +func (b *Builder) Build() selector.Balancer { + return &Balancer{r: rand.New(rand.NewSource(time.Now().UnixNano()))} +} diff --git a/selector/random/random.go b/selector/random/random.go index 14a194df7..48c3e0aa5 100644 --- a/selector/random/random.go +++ b/selector/random/random.go @@ -33,18 +33,9 @@ type options struct { // Balancer is a random balancer. type Balancer struct{} -// New random a selector. +// New an random selector. func New(opts ...Option) selector.Selector { - var option options - for _, opt := range opts { - opt(&option) - } - - return &selector.Default{ - Balancer: &Balancer{}, - NodeBuilder: &direct.Builder{}, - Filters: option.filters, - } + return NewBuilder(opts...).Build() } // Pick pick a weighted node. @@ -57,3 +48,24 @@ func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selec d := selected.Pick() return selected, d, nil } + +// NewBuilder returns a selector builder with random balancer +func NewBuilder(opts ...Option) selector.Builder { + var option options + for _, opt := range opts { + opt(&option) + } + return &selector.DefaultBuilder{ + Filters: option.filters, + Balancer: &Builder{}, + Node: &direct.Builder{}, + } +} + +// Builder is random builder +type Builder struct{} + +// Build creates Balancer +func (b *Builder) Build() selector.Balancer { + return &Balancer{} +} diff --git a/selector/selector.go b/selector/selector.go index 21572ac7b..3df4776ea 100644 --- a/selector/selector.go +++ b/selector/selector.go @@ -24,6 +24,11 @@ type Rebalancer interface { Apply(nodes []Node) } +// Builder build selector +type Builder interface { + Build() Selector +} + // Node is node interface. type Node interface { // Address is the unique address under the same service diff --git a/selector/wrr/wrr.go b/selector/wrr/wrr.go index e5898daf4..4b0dfc6a0 100644 --- a/selector/wrr/wrr.go +++ b/selector/wrr/wrr.go @@ -38,18 +38,7 @@ type Balancer struct { // New random a selector. func New(opts ...Option) selector.Selector { - var option options - for _, opt := range opts { - opt(&option) - } - - return &selector.Default{ - Balancer: &Balancer{ - currentWeight: make(map[string]float64), - }, - NodeBuilder: &direct.Builder{}, - Filters: option.filters, - } + return NewBuilder(opts...).Build() } // Pick pick a weighted node. @@ -80,3 +69,24 @@ func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selec d := selected.Pick() return selected, d, nil } + +// NewBuilder returns a selector builder with wrr balancer +func NewBuilder(opts ...Option) selector.Builder { + var option options + for _, opt := range opts { + opt(&option) + } + return &selector.DefaultBuilder{ + Filters: option.filters, + Balancer: &Builder{}, + Node: &direct.Builder{}, + } +} + +// Builder is wrr builder +type Builder struct{} + +// Build creates Balancer +func (b *Builder) Build() selector.Balancer { + return &Balancer{currentWeight: make(map[string]float64)} +} diff --git a/transport/grpc/balancer.go b/transport/grpc/balancer.go index 3814742fe..c25fa2b1a 100644 --- a/transport/grpc/balancer.go +++ b/transport/grpc/balancer.go @@ -25,19 +25,19 @@ var ( func init() { // inject global grpc balancer - SetGlobalBalancer(random.Name, random.New()) - SetGlobalBalancer(wrr.Name, wrr.New()) - SetGlobalBalancer(p2c.Name, p2c.New()) + SetGlobalBalancer(random.Name, random.NewBuilder()) + SetGlobalBalancer(wrr.Name, wrr.NewBuilder()) + SetGlobalBalancer(p2c.Name, p2c.NewBuilder()) } // SetGlobalBalancer set grpc balancer with scheme. -func SetGlobalBalancer(scheme string, selector selector.Selector) { +func SetGlobalBalancer(scheme string, builder selector.Builder) { mu.Lock() defer mu.Unlock() b := base.NewBalancerBuilder( scheme, - &Builder{selector}, + &Builder{builder: builder}, base.Config{HealthCheck: true}, ) gBalancer.Register(b) @@ -45,7 +45,7 @@ func SetGlobalBalancer(scheme string, selector selector.Selector) { // Builder is grpc balancer builder. type Builder struct { - selector selector.Selector + builder selector.Builder } // Build creates a grpc Picker. @@ -62,7 +62,7 @@ func (b *Builder) Build(info base.PickerBuildInfo) gBalancer.Picker { nodes = append(nodes, node.New(info.Address.Addr, ins)) } p := &Picker{ - selector: b.selector, + selector: b.builder.Build(), subConns: subConns, } p.selector.Apply(nodes)