add zero endpoint protection (#1215)

* add zero endpoint protection
pull/1217/head
longxboy 4 years ago committed by GitHub
parent 44dd641f28
commit a02d9b4192
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      transport/grpc/client.go
  2. 29
      transport/grpc/resolver/discovery/builder.go
  3. 4
      transport/grpc/resolver/discovery/resolver.go
  4. 2
      transport/http/client.go
  5. 10
      transport/http/resolver.go

@ -84,7 +84,7 @@ func DialInsecure(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn,
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
options := clientOptions{
timeout: 500 * time.Millisecond,
timeout: 2000 * time.Millisecond,
}
for _, o := range opts {
o(&options)

@ -2,6 +2,7 @@ package discovery
import (
"context"
"time"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry"
@ -15,14 +16,22 @@ type Option func(o *builder)
// WithLogger with builder logger.
func WithLogger(logger log.Logger) Option {
return func(o *builder) {
o.logger = logger
return func(b *builder) {
b.logger = logger
}
}
// WithTimeout with timeout option.
func WithTimeout(timeout time.Duration) Option {
return func(b *builder) {
b.timeout = timeout
}
}
type builder struct {
discoverer registry.Discovery
logger log.Logger
timeout time.Duration
}
// NewBuilder creates a builder which is used to factory registry resolvers.
@ -30,6 +39,7 @@ func NewBuilder(d registry.Discovery, opts ...Option) resolver.Builder {
b := &builder{
discoverer: d,
logger: log.DefaultLogger,
timeout: time.Second * 10,
}
for _, o := range opts {
o(b)
@ -37,23 +47,28 @@ func NewBuilder(d registry.Discovery, opts ...Option) resolver.Builder {
return b
}
func (d *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
w, err := d.discoverer.Watch(context.Background(), target.Endpoint)
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel()
w, err := b.discoverer.Watch(ctx, target.Endpoint)
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
r := &discoveryResolver{
w: w,
cc: cc,
ctx: ctx,
cancel: cancel,
log: log.NewHelper(d.logger),
log: log.NewHelper(b.logger),
}
r.ctx, r.cancel = context.WithCancel(context.Background())
go r.watch()
return r, nil
}
func (d *builder) Scheme() string {
// Scheme return scheme of discovery
func (*builder) Scheme() string {
return name
}

@ -56,6 +56,10 @@ func (r *discoveryResolver) update(ins []*registry.ServiceInstance) {
}
addrs = append(addrs, addr)
}
if len(addrs) == 0 {
r.log.Warnf("[resovler]Zero endpoint found,refused to write, ins: %v", ins)
return
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
}

@ -139,7 +139,7 @@ type Client struct {
func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
options := clientOptions{
ctx: ctx,
timeout: 500 * time.Millisecond,
timeout: 2000 * time.Millisecond,
encoder: DefaultRequestEncoder,
decoder: DefaultResponseDecoder,
errorDecoder: DefaultErrorDecoder,

@ -86,10 +86,12 @@ func newResolver(ctx context.Context, discovery registry.Discovery, target *Targ
r.lock.Lock()
r.nodes = nodes
r.lock.Unlock()
}
if block && !executed {
executed = true
done <- nil
if block && !executed {
executed = true
done <- nil
}
} else {
r.logger.Warnf("[http resovler]Zero endpoint found,refused to write,ser: %s ins: %v", target.Endpoint, nodes)
}
}
}()

Loading…
Cancel
Save