You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
kratos/selector/p2c/p2c.go

106 lines
2.5 KiB

package p2c
import (
"context"
"math/rand"
"sync/atomic"
"time"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/node/ewma"
)
const (
forcePick = time.Second * 3
// Name is balancer name
Name = "p2c"
)
var _ selector.Balancer = &Balancer{}
// WithFilter with select filters
func WithFilter(filters ...selector.Filter) Option {
return func(o *options) {
o.filters = filters
}
}
// Option is random builder option.
type Option func(o *options)
// options is random builder options
type options struct {
filters []selector.Filter
}
// New creates a p2c selector.
func New(opts ...Option) selector.Selector {
return NewBuilder(opts...).Build()
}
// Balancer is p2c selector.
type Balancer struct {
r *rand.Rand
lk int64
}
// choose two distinct nodes.
func (s *Balancer) prePick(nodes []selector.WeightedNode) (nodeA selector.WeightedNode, nodeB selector.WeightedNode) {
source := rand.NewSource(time.Now().UnixNano())
a := rand.New(source).Intn(len(nodes))
b := rand.New(source).Intn(len(nodes) - 1)
if b >= a {
b = b + 1
}
nodeA, nodeB = nodes[a], nodes[b]
return
}
// Pick pick a node.
func (s *Balancer) Pick(ctx context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) {
if len(nodes) == 0 {
return nil, nil, selector.ErrNoAvailable
} else if len(nodes) == 1 {
done := nodes[0].Pick()
return nodes[0], done, nil
}
var pc, upc selector.WeightedNode
nodeA, nodeB := s.prePick(nodes)
// meta.Weight is the weight set by the service publisher in discovery
if nodeB.Weight() > nodeA.Weight() {
pc, upc = nodeB, nodeA
} else {
pc, upc = nodeA, nodeB
}
// If the failed node has never been selected once during forceGap, it is forced to be selected once
// Take advantage of forced opportunities to trigger updates of success rate and delay
if upc.PickElapsed() > forcePick && atomic.CompareAndSwapInt64(&s.lk, 0, 1) {
pc = upc
atomic.StoreInt64(&s.lk, 0)
}
done := pc.Pick()
return pc, done, nil
}
// NewBuilder returns a selector builder with p2c balancer
func NewBuilder(opts ...Option) selector.Builder {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.DefaultBuilder{
Filters: option.filters,
Balancer: &Builder{},
Node: &ewma.Builder{},
}
}
// Builder is p2c builder
type Builder struct{}
// Build creates Balancer
func (b *Builder) Build() selector.Balancer {
return &Balancer{r: rand.New(rand.NewSource(time.Now().UnixNano()))}
}