diff --git a/transport/grpc/client.go b/transport/grpc/client.go index 4e87df7c2..ab307419a 100644 --- a/transport/grpc/client.go +++ b/transport/grpc/client.go @@ -84,7 +84,7 @@ func DialInsecure(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) { options := clientOptions{ - timeout: 500 * time.Millisecond, + timeout: 2000 * time.Millisecond, } for _, o := range opts { o(&options) diff --git a/transport/grpc/resolver/discovery/builder.go b/transport/grpc/resolver/discovery/builder.go index 7846b5fcb..0d834b8bf 100644 --- a/transport/grpc/resolver/discovery/builder.go +++ b/transport/grpc/resolver/discovery/builder.go @@ -2,6 +2,7 @@ package discovery import ( "context" + "time" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/registry" @@ -15,14 +16,22 @@ type Option func(o *builder) // WithLogger with builder logger. func WithLogger(logger log.Logger) Option { - return func(o *builder) { - o.logger = logger + return func(b *builder) { + b.logger = logger + } +} + +// WithTimeout with timeout option. +func WithTimeout(timeout time.Duration) Option { + return func(b *builder) { + b.timeout = timeout } } type builder struct { discoverer registry.Discovery logger log.Logger + timeout time.Duration } // NewBuilder creates a builder which is used to factory registry resolvers. @@ -30,6 +39,7 @@ func NewBuilder(d registry.Discovery, opts ...Option) resolver.Builder { b := &builder{ discoverer: d, logger: log.DefaultLogger, + timeout: time.Second * 10, } for _, o := range opts { o(b) @@ -37,23 +47,28 @@ func NewBuilder(d registry.Discovery, opts ...Option) resolver.Builder { return b } -func (d *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { - w, err := d.discoverer.Watch(context.Background(), target.Endpoint) +func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) { + ctx, cancel := context.WithTimeout(context.Background(), b.timeout) + defer cancel() + w, err := b.discoverer.Watch(ctx, target.Endpoint) if err != nil { return nil, err } - ctx, cancel := context.WithCancel(context.Background()) + r := &discoveryResolver{ w: w, cc: cc, ctx: ctx, cancel: cancel, - log: log.NewHelper(d.logger), + log: log.NewHelper(b.logger), } + r.ctx, r.cancel = context.WithCancel(context.Background()) go r.watch() + return r, nil } -func (d *builder) Scheme() string { +// Scheme return scheme of discovery +func (*builder) Scheme() string { return name } diff --git a/transport/grpc/resolver/discovery/resolver.go b/transport/grpc/resolver/discovery/resolver.go index 15cc9417d..19725beab 100644 --- a/transport/grpc/resolver/discovery/resolver.go +++ b/transport/grpc/resolver/discovery/resolver.go @@ -56,6 +56,10 @@ func (r *discoveryResolver) update(ins []*registry.ServiceInstance) { } addrs = append(addrs, addr) } + if len(addrs) == 0 { + r.log.Warnf("[resovler]Zero endpoint found,refused to write, ins: %v", ins) + return + } r.cc.UpdateState(resolver.State{Addresses: addrs}) } diff --git a/transport/http/client.go b/transport/http/client.go index dad491cce..8a5e422c2 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -139,7 +139,7 @@ type Client struct { func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) { options := clientOptions{ ctx: ctx, - timeout: 500 * time.Millisecond, + timeout: 2000 * time.Millisecond, encoder: DefaultRequestEncoder, decoder: DefaultResponseDecoder, errorDecoder: DefaultErrorDecoder, diff --git a/transport/http/resolver.go b/transport/http/resolver.go index a200477c7..77ed26999 100644 --- a/transport/http/resolver.go +++ b/transport/http/resolver.go @@ -86,10 +86,12 @@ func newResolver(ctx context.Context, discovery registry.Discovery, target *Targ r.lock.Lock() r.nodes = nodes r.lock.Unlock() - } - if block && !executed { - executed = true - done <- nil + if block && !executed { + executed = true + done <- nil + } + } else { + r.logger.Warnf("[http resovler]Zero endpoint found,refused to write,ser: %s ins: %v", target.Endpoint, nodes) } } }()