|
|
|
@ -20,7 +20,7 @@ import ( |
|
|
|
|
"github.com/go-kratos/kratos/v2/transport/http/balancer/random" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// Client is http client
|
|
|
|
|
// Client is an HTTP client.
|
|
|
|
|
type Client struct { |
|
|
|
|
cc *http.Client |
|
|
|
|
r *resolver |
|
|
|
@ -36,11 +36,6 @@ type Client struct { |
|
|
|
|
discovery registry.Discovery |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
const ( |
|
|
|
|
// errNodeNotFound represents service node not found.
|
|
|
|
|
errNodeNotFound = "NODE_NOT_FOUND" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// DecodeErrorFunc is decode error func.
|
|
|
|
|
type DecodeErrorFunc func(ctx context.Context, res *http.Response) error |
|
|
|
|
|
|
|
|
@ -132,7 +127,7 @@ func WithBalancer(b balancer.Balancer) ClientOption { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Client is a HTTP transport client.
|
|
|
|
|
// Client is an HTTP transport client.
|
|
|
|
|
type clientOptions struct { |
|
|
|
|
ctx context.Context |
|
|
|
|
transport http.RoundTripper |
|
|
|
@ -154,11 +149,10 @@ func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) { |
|
|
|
|
ctx: ctx, |
|
|
|
|
scheme: "http", |
|
|
|
|
timeout: 1 * time.Second, |
|
|
|
|
encoder: defaultRequestEncoder, |
|
|
|
|
decoder: defaultResponseDecoder, |
|
|
|
|
errorDecoder: defaultErrorDecoder, |
|
|
|
|
encoder: DefaultRequestEncoder, |
|
|
|
|
decoder: DefaultResponseDecoder, |
|
|
|
|
errorDecoder: DefaultErrorDecoder, |
|
|
|
|
transport: http.DefaultTransport, |
|
|
|
|
discovery: nil, |
|
|
|
|
balancer: random.New(), |
|
|
|
|
} |
|
|
|
|
for _, o := range opts { |
|
|
|
@ -259,18 +253,18 @@ func (client *Client) invoke(ctx context.Context, req *http.Request, args interf |
|
|
|
|
if client.r != nil { |
|
|
|
|
nodes := client.r.fetch(ctx) |
|
|
|
|
if len(nodes) == 0 { |
|
|
|
|
return nil, errors.ServiceUnavailable(errNodeNotFound, "fetch error") |
|
|
|
|
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", "fetch error") |
|
|
|
|
} |
|
|
|
|
var node *registry.ServiceInstance |
|
|
|
|
var err error |
|
|
|
|
node, done, err = client.b.Pick(ctx, c.pathPattern, nodes) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.ServiceUnavailable(errNodeNotFound, err.Error()) |
|
|
|
|
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error()) |
|
|
|
|
} |
|
|
|
|
req = req.Clone(ctx) |
|
|
|
|
addr, err := parseEndpoint(client.scheme, node.Endpoints) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.ServiceUnavailable(errNodeNotFound, err.Error()) |
|
|
|
|
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error()) |
|
|
|
|
} |
|
|
|
|
req.URL.Host = addr |
|
|
|
|
} |
|
|
|
@ -317,7 +311,8 @@ func (client *Client) do(ctx context.Context, req *http.Request, c callInfo) (*h |
|
|
|
|
return resp, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func defaultRequestEncoder(ctx context.Context, in interface{}) (string, []byte, error) { |
|
|
|
|
// DefaultRequestEncoder is an HTTP request encoder.
|
|
|
|
|
func DefaultRequestEncoder(ctx context.Context, in interface{}) (string, []byte, error) { |
|
|
|
|
body, err := encoding.GetCodec("json").Marshal(in) |
|
|
|
|
if err != nil { |
|
|
|
|
return "", nil, err |
|
|
|
@ -325,16 +320,18 @@ func defaultRequestEncoder(ctx context.Context, in interface{}) (string, []byte, |
|
|
|
|
return "application/json", body, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func defaultResponseDecoder(ctx context.Context, res *http.Response, v interface{}) error { |
|
|
|
|
// DefaultResponseDecoder is an HTTP response decoder.
|
|
|
|
|
func DefaultResponseDecoder(ctx context.Context, res *http.Response, v interface{}) error { |
|
|
|
|
defer res.Body.Close() |
|
|
|
|
data, err := ioutil.ReadAll(res.Body) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
return codecForResponse(res).Unmarshal(data, v) |
|
|
|
|
return CodecForResponse(res).Unmarshal(data, v) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func defaultErrorDecoder(ctx context.Context, res *http.Response) error { |
|
|
|
|
// DefaultErrorDecoder is an HTTP error decoder.
|
|
|
|
|
func DefaultErrorDecoder(ctx context.Context, res *http.Response) error { |
|
|
|
|
if res.StatusCode >= 200 && res.StatusCode <= 299 { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
@ -342,20 +339,18 @@ func defaultErrorDecoder(ctx context.Context, res *http.Response) error { |
|
|
|
|
data, err := ioutil.ReadAll(res.Body) |
|
|
|
|
if err == nil { |
|
|
|
|
e := new(errors.Error) |
|
|
|
|
if err = codecForResponse(res).Unmarshal(data, e); err == nil { |
|
|
|
|
if err = CodecForResponse(res).Unmarshal(data, e); err == nil { |
|
|
|
|
return e |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return errors.Errorf(res.StatusCode, errors.UnknownReason, err.Error()) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func codecForResponse(r *http.Response) encoding.Codec { |
|
|
|
|
// CodecForResponse get encoding.Codec via http.Response
|
|
|
|
|
func CodecForResponse(r *http.Response) encoding.Codec { |
|
|
|
|
codec := encoding.GetCodec(httputil.ContentSubtype("Content-Type")) |
|
|
|
|
if codec != nil { |
|
|
|
|
return codec |
|
|
|
|
} |
|
|
|
|
if codec == nil { |
|
|
|
|
codec = encoding.GetCodec("json") |
|
|
|
|
} |
|
|
|
|
return codec |
|
|
|
|
return encoding.GetCodec("json") |
|
|
|
|
} |
|
|
|
|