keep balancer nodes (#1110)

pull/1112/head
longxboy 4 years ago committed by GitHub
parent 8d30b6d489
commit 953c91d354
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      transport/http/balancer/balancer.go
  2. 15
      transport/http/balancer/random/random.go
  3. 9
      transport/http/client.go
  4. 8
      transport/http/resolver.go

@ -14,5 +14,8 @@ type DoneInfo struct {
// Balancer is node pick balancer
type Balancer interface {
Pick(ctx context.Context, nodes []*registry.ServiceInstance) (node *registry.ServiceInstance, done func(context.Context, DoneInfo), err error)
// Pick one node
Pick(ctx context.Context) (node *registry.ServiceInstance, done func(context.Context, DoneInfo), err error)
// Update nodes when nodes removed or added
Update(nodes []*registry.ServiceInstance)
}

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/http/balancer"
@ -12,13 +13,19 @@ import (
var _ balancer.Balancer = &Balancer{}
type Balancer struct {
lock sync.RWMutex
nodes []*registry.ServiceInstance
}
func New() *Balancer {
return &Balancer{}
}
func (b *Balancer) Pick(ctx context.Context, nodes []*registry.ServiceInstance) (node *registry.ServiceInstance, done func(context.Context, balancer.DoneInfo), err error) {
func (b *Balancer) Pick(ctx context.Context) (node *registry.ServiceInstance, done func(context.Context, balancer.DoneInfo), err error) {
b.lock.RLock()
nodes := b.nodes
b.lock.RUnlock()
if len(nodes) == 0 {
return nil, nil, fmt.Errorf("no instances avaiable")
}
@ -28,3 +35,9 @@ func (b *Balancer) Pick(ctx context.Context, nodes []*registry.ServiceInstance)
idx := rand.Intn(len(nodes))
return nodes[idx], func(context.Context, balancer.DoneInfo) {}, nil
}
func (b *Balancer) Update(nodes []*registry.ServiceInstance) {
b.lock.Lock()
defer b.lock.Unlock()
b.nodes = nodes
}

@ -147,7 +147,7 @@ func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
var r *resolver
if options.discovery != nil {
if target.Scheme == "discovery" {
if r, err = newResolver(ctx, options.discovery, target); err != nil {
if r, err = newResolver(ctx, options.discovery, target, options.balancer); err != nil {
return nil, fmt.Errorf("[http client] new resolver failed!err: %v", options.endpoint)
}
} else {
@ -211,11 +211,10 @@ func (client *Client) invoke(ctx context.Context, req *http.Request, args interf
var done func(context.Context, balancer.DoneInfo)
if client.r != nil {
var (
err error
node *registry.ServiceInstance
nodes = client.r.fetch(ctx)
err error
node *registry.ServiceInstance
)
if node, done, err = client.opts.balancer.Pick(ctx, nodes); err != nil {
if node, done, err = client.opts.balancer.Pick(ctx); err != nil {
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error())
}
scheme, addr, err := parseEndpoint(node.Endpoints)

@ -9,6 +9,11 @@ import (
"github.com/go-kratos/kratos/v2/registry"
)
// Updater is resolver nodes updater
type Updater interface {
Update(nodes []*registry.ServiceInstance)
}
// Target is resolver target
type Target struct {
Scheme string
@ -39,7 +44,7 @@ type resolver struct {
logger *log.Helper
}
func newResolver(ctx context.Context, discovery registry.Discovery, target *Target) (*resolver, error) {
func newResolver(ctx context.Context, discovery registry.Discovery, target *Target, updater Updater) (*resolver, error) {
watcher, err := discovery.Watch(ctx, target.Endpoint)
if err != nil {
return nil, err
@ -69,6 +74,7 @@ func newResolver(ctx context.Context, discovery registry.Discovery, target *Targ
nodes = append(nodes, in)
}
if len(nodes) != 0 {
updater.Update(nodes)
r.lock.Lock()
r.nodes = nodes
r.lock.Unlock()

Loading…
Cancel
Save