feat(endpoint): add endpoint parser (#1273)

* feat(endpoint): add endpoint parser

* fix parseTarget ut

* fix insecure testing
pull/1275/head
Tony Chen 3 years ago committed by GitHub
parent 8f4e78b47d
commit bc35f20228
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 64
      examples/etcd_tls/cert/server.crt
  2. 81
      examples/etcd_tls/client/main.go
  3. 75
      examples/etcd_tls/server/main.go
  4. 65
      examples/etcd_tls/server2/main.go
  5. 132
      examples/tls/tls_test.go
  6. 22
      internal/endpoint/endpoint.go
  7. 22
      transport/grpc/resolver/discovery/resolver.go
  8. 4
      transport/grpc/server.go
  9. 38
      transport/http/client.go
  10. 34
      transport/http/resolver.go
  11. 8
      transport/http/resolver_test.go

@ -1,64 +0,0 @@
-----BEGIN CERTIFICATE-----
MIIFZTCCA02gAwIBAgIBATANBgkqhkiG9w0BAQUFADB8MQswCQYDVQQGEwJDTjEL
MAkGA1UECAwCU0gxCzAJBgNVBAcMAlNIMQ8wDQYDVQQKDAZrcmF0b3MxCzAJBgNV
BAsMAklUMRMwEQYDVQQDDAprcmF0b3MuY29tMSAwHgYJKoZIhvcNAQkBFhFrcmF0
b3NAa3JhdG9zLmNvbTAeFw0yMTA3MjcxNTM1MjJaFw0yMjA3MjcxNTM1MjJaME0x
CzAJBgNVBAYTAkNOMQswCQYDVQQIDAJTSDELMAkGA1UEBwwCU0gxDzANBgNVBAoM
BmtyYXRvczETMBEGA1UEAwwKa3JhdG9zLmNvbTCCAiIwDQYJKoZIhvcNAQEBBQAD
ggIPADCCAgoCggIBALfEWAuPqUHTwTiOT8dtiCM3aEm1D7I6K/PY2mSDMMOI4f5z
TYi7LsKLLItMQR44cEDein/kl0U0QNYJRHqVCr/3IXA4ds0maiq+npY/S2KABDiI
Z38TZ1PZ2bcD3Jb0o1gw0GcSokOGzVtuNKSiASc3D711AepGerChD5UrbixrZdg2
wAeWZj39Tl93+zCcldVrCHMSM7LmDluHJ5KZ8T6auuK6ypQPVhqLz8VseHhB/IRw
s9/o9OlJ3kv4wB/CWlFIvU6ZGVnshZAGOk3Brq25kw600RRDr2MpxNhFY1Xvrt7D
tAJ7fK3/3VaV+I2C4OzzmztK7T1WlL8XqZj6I3t1ZCZryMvDIUPzC4mgiRqEJiF7
VzuVm1sjyTKkD4oX6ZjJYXJ/6pbvd8+AexuwVAsQJKqaF1iQC9jThg55RUTkGo8F
DFErW7XHKHe8vKXuRLG5k3xZBiHK7gsVyHzx0ouuSuZMFHg7L9ACeeqtqxqKNd+0
fIo4N0vNb3GEj+YaTLoadhDSBEsynyQNTfrDf+oFRmQUg3q0W4VJHJFhkmTHRRcq
Qj6xEAJDHC1xr8yr3jif9BKWG2+zEbvpiTcRXKhycv2OUI11dK72aMnOycusJjRe
8pOqcYhSVQZnz31WWlkmX9TRiQtEeUUkFCYAnIArSKm5rNwOddLCzC8Z4js1AgMB
AAGjITAfMB0GA1UdEQQWMBSCDCoua3JhdG9zLmNvbYcEfwAAATANBgkqhkiG9w0B
AQUFAAOCAgEAjr3SXzNOcN8+JQuroS6hKHadrcp6djepd3r5YKSEjKBNxVAU0gj6
QGl0zjSqhxSFwN4wCqXU/4JJVOyAJCV+t992j5wNdaGI+Tcu4whK2LtPi0O68ttq
Nn5H/8bmotW0IZ/YDcq1V8EVWiTZPECk4QLx26S2sjG4HOKNUAs8o+PoUmQE5bKJ
XBFWmjsOfPnI0WBGnuCvGUw5wP1ipLiuK+OhoTNKA6SXPopm5KMDv7gjYPlcmPyI
sJcpve75m9EXQxrDvJtvws8MIZnkWvWi7bW6uQ7274S7YjMZL09/sQTolQOtwl59
pvEbkQNPzgdvQYAfrlJjSBtbq3OtHF+j+p2K+7R0TY2F1OW3LeV8vkcq7IOt38Er
FK5feNEL3t9GlrF8ASmJp/JvhWQiJo5tZJxWZ68CjKfLmVd406ehsK5XNlHJemnl
hNpuAegeV6WDglvMNavvQCJfK5mKokn73HujtQveZ8vwPKV4f6Wg3sHJ2yyP7ou2
2UZ10Qbp6UvFYfYuMvLuq8kznapWqQ0dIJzPDs+CzVtjk1eINUF9UjsSvqX4oCvy
+ZvaM9k4kU/fJMRkLstc/Dx2G13T/moI/l5sw0qFtck12zs/SQ7xgcW3b0ruGX9S
11opfl5R86GpXXbz+vNL9fWP+8cIvoGZK8RAC872bcMPEoJPjPIwAbI=
-----END CERTIFICATE-----
-----BEGIN CERTIFICATE-----
MIIFdDCCA1wCCQDHq+cGa349DzANBgkqhkiG9w0BAQsFADB8MQswCQYDVQQGEwJD
TjELMAkGA1UECAwCU0gxCzAJBgNVBAcMAlNIMQ8wDQYDVQQKDAZrcmF0b3MxCzAJ
BgNVBAsMAklUMRMwEQYDVQQDDAprcmF0b3MuY29tMSAwHgYJKoZIhvcNAQkBFhFr
cmF0b3NAa3JhdG9zLmNvbTAeFw0yMTA3MjcxNTMwMzlaFw0zMTA3MjUxNTMwMzla
MHwxCzAJBgNVBAYTAkNOMQswCQYDVQQIDAJTSDELMAkGA1UEBwwCU0gxDzANBgNV
BAoMBmtyYXRvczELMAkGA1UECwwCSVQxEzARBgNVBAMMCmtyYXRvcy5jb20xIDAe
BgkqhkiG9w0BCQEWEWtyYXRvc0BrcmF0b3MuY29tMIICIjANBgkqhkiG9w0BAQEF
AAOCAg8AMIICCgKCAgEApy2GV9MiECuelNk3fz1Qwh6+wj8Ip11NG+LxEGK4/MLD
JRJtbgAg/7s3vzrm4WDKATDO27W6wewNFOvEnGWyh9wyjAtSgnAcJreq7F2DMbpO
+E2guIQHSCCzfa10s4BgwXKdBRPPvwTADIHXPtlbq4BItJqzt/AhLQbdDAp93mHX
NCzFdlIr4wflT2OW7EO24K2LgMZLWCzaESei9fL6AYm7jEvfaFYZksI3rjJNAj1q
wccMu1o6TvdWRA5fvBi6h15Z0ekR8C2LbM1A54zziZwd+YjcwdQHJJgWJFH7yNSt
Oe/AJZzP1nRk/5H0EvxBnF7du6vfeSjZJytp8cXMlbYg4NGGkSy782tBaUaDIb43
iLqSjfHVZDLzbDGNy/u/mzfo4xS8lxZ92zE7z21d0WyUAJ75Z72v4kaNTr2tnMuE
NTTG1787e3NB0CaV6gjeP8XbMV8gwNTrJmTW3dS+DT6sKtTIISOvsUCZ6h5qFvKU
RWqJ7MiaSxg31DPg51caYDjiVLkkES8GRpPM/Njsg9WpFTQeqcecKbbOdw0ihvoV
fq0FgHpp+jbjm0KkmcNdSX7Ld5XeHp2rBPkA283IdXIAvjjthlyWJTmSP6kDDdEI
km4Did1Bg0wcXNnFlbHHavCfeRTQbVoIYVkWH7I323Vp8itvKobz1GirSOK0Hr0C
AwEAATANBgkqhkiG9w0BAQsFAAOCAgEAFBYIOOyABIbUUOjjjvx2FSDRBNLpee5O
45KmznuCGerhR7ad3rUNhaakA9HJzmLMUXtmyzy6+ej0HzdqZE7RRzdDVftBGaOf
thwEzUAiHfqeX0o039sTQvJSqUY2sBko+tyDRmeNtmd3TPE5VZZSWG+TUtrOofr7
K28UquMthJrmtSC8IJQOvA78Nc/FCPNaGZrcS8ZvrgbrkCLPN4dIJvY9I38xaWQ+
G0gNQazxPzdp89/UMzkczyJAKjYj4JyLCbrjzTZXjtu+rYJezxyS+3QSf0F0xVBr
/8HCeXX6xG16WZY54Z6AijqI3sjiRBSQ25rLJJ00sWa9k1oH0Poyiv4pQG3RHgIs
jJEQh7RQE5zAsBx2NHZ8FcsUGX2oOjSS+vtX//Bg37kN7oYx5HEtK9c+a0wWUxo8
8cqIzqrQSWOd1CipxbE5CUUNKdGQzNCLTfLO68KitLdaSOCGoU/PaoAncbr6EjdB
kzqJcHooqq8asl6fu1DVnYqCpEEp30ldU3p4MdclcMV0XZUq6bXFq1ylmfZXfC0t
zSGnBUjxB5lohn5fs1S/DxyoUR8LbIFVWCljEf4jJtMRnCQV3bR3xeVTbGh3ti81
21uhQKjIP/X2BRNAvRo1qUbrzEeHAJG3EbIG78rRFvSz6MkWVTZzeH1SgCr9Onw7
djovWE7E6ic=
-----END CERTIFICATE-----

@ -1,81 +0,0 @@
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"github.com/go-kratos/kratos/v2/transport/http"
"io/ioutil"
"log"
"time"
"github.com/go-kratos/etcd/registry"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2/transport/grpc"
clientv3 "go.etcd.io/etcd/client/v3"
)
func main() {
b, err := ioutil.ReadFile("../cert/server.crt")
if err != nil {
panic(err)
}
cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(b) {
panic(err)
}
tlsConf := &tls.Config{ServerName: "www.kratos.com", RootCAs: cp}
cli, err := clientv3.New(clientv3.Config{
Endpoints: []string{"127.0.0.1:2379"},
})
if err != nil {
panic(err)
}
r := registry.New(cli)
for {
callGRPC(r, tlsConf)
callHTTP(r, tlsConf)
time.Sleep(time.Second)
}
}
func callGRPC(r *registry.Registry, tlsConf *tls.Config) {
conn, err := grpc.Dial(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
grpc.WithTLSConfig(tlsConf),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}
log.Printf("[grpc] SayHello %+v\n", reply)
}
func callHTTP(r *registry.Registry, tlsConf *tls.Config) {
conn, err := http.NewClient(
context.Background(),
http.WithEndpoint("discovery:///helloworld"),
http.WithDiscovery(r),
http.WithBlock(),
http.WithTLSConfig(tlsConf),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterHTTPClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}
log.Printf("[http] SayHello %+v\n", reply)
}

@ -1,75 +0,0 @@
package main
import (
"context"
"crypto/tls"
"fmt"
"github.com/go-kratos/kratos/v2/transport/http"
"log"
"github.com/go-kratos/etcd/registry"
pb "github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/grpc"
etcd "go.etcd.io/etcd/client/v3"
)
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: fmt.Sprintf("use tls:Welcome %+v!", in.Name)}, nil
}
func main() {
cert, err := tls.LoadX509KeyPair("../cert/server.crt", "../cert/server.key")
if err != nil {
panic(err)
}
tlsConf := &tls.Config{Certificates: []tls.Certificate{cert}}
client, err := etcd.New(etcd.Config{
Endpoints: []string{"127.0.0.1:2379"},
})
if err != nil {
log.Fatal(err)
}
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
recovery.Recovery(),
),
grpc.TLSConfig(tlsConf),
)
httpSrv := http.NewServer(
http.Address(":8000"),
http.Middleware(
recovery.Recovery(),
),
http.TLSConfig(tlsConf),
)
s := &server{}
pb.RegisterGreeterServer(grpcSrv, s)
pb.RegisterGreeterHTTPServer(httpSrv, s)
r := registry.New(client)
app := kratos.New(
kratos.Name("helloworld"),
kratos.Server(
grpcSrv,
httpSrv,
),
kratos.Registrar(r),
)
if err := app.Run(); err != nil {
log.Fatal(err)
}
}

@ -1,65 +0,0 @@
package main
import (
"context"
"fmt"
"github.com/go-kratos/kratos/v2/transport/http"
"log"
"github.com/go-kratos/etcd/registry"
pb "github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/grpc"
etcd "go.etcd.io/etcd/client/v3"
)
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: fmt.Sprintf("not use tls:Welcome %+v!", in.Name)}, nil
}
func main() {
client, err := etcd.New(etcd.Config{
Endpoints: []string{"127.0.0.1:2379"},
})
if err != nil {
log.Fatal(err)
}
grpcSrv := grpc.NewServer(
grpc.Address(":9001"),
grpc.Middleware(
recovery.Recovery(),
),
)
httpSrv := http.NewServer(
http.Address(":8001"),
http.Middleware(
recovery.Recovery(),
),
)
s := &server{}
pb.RegisterGreeterServer(grpcSrv, s)
pb.RegisterGreeterHTTPServer(httpSrv, s)
r := registry.New(client)
app := kratos.New(
kratos.Name("helloworld"),
kratos.Server(
grpcSrv,
httpSrv,
),
kratos.Registrar(r),
)
if err := app.Run(); err != nil {
log.Fatal(err)
}
}

@ -0,0 +1,132 @@
package main
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"io/ioutil"
"testing"
"time"
etcdregistry "github.com/go-kratos/etcd/registry"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
pb "github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
etcd "go.etcd.io/etcd/client/v3"
)
// server is used to implement helloworld.GreeterServer.
type server struct {
pb.UnimplementedGreeterServer
}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *pb.HelloRequest) (*pb.HelloReply, error) {
return &pb.HelloReply{Message: fmt.Sprintf("Welcome %+v!", in.Name)}, nil
}
func startServer(r registry.Registrar, c *tls.Config) (app *kratos.App, err error) {
httpSrv := http.NewServer(http.TLSConfig(c))
grpcSrv := grpc.NewServer(grpc.TLSConfig(c))
s := &server{}
pb.RegisterGreeterServer(grpcSrv, s)
pb.RegisterGreeterHTTPServer(httpSrv, s)
app = kratos.New(
kratos.Name("helloworld"),
kratos.Server(
httpSrv,
grpcSrv,
),
kratos.Registrar(r),
kratos.RegistrarTimeout(5*time.Second),
)
go func() {
err = app.Run()
}()
time.Sleep(time.Second)
return
}
func callGRPC(t *testing.T, r registry.Discovery, c *tls.Config) {
conn, err := grpc.Dial(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithTLSConfig(c),
grpc.WithDiscovery(r),
)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
t.Fatal(err)
}
t.Logf("[grpc] SayHello %+v\n", reply)
}
func callHTTP(t *testing.T, r registry.Discovery, c *tls.Config) {
conn, err := http.NewClient(
context.Background(),
http.WithEndpoint("discovery:///helloworld"),
http.WithTLSConfig(c),
http.WithDiscovery(r),
http.WithBlock(),
)
if err != nil {
t.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterHTTPClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
t.Fatal(err)
}
t.Logf("[http] SayHello %+v\n", reply)
}
func TestETCD(t *testing.T) {
client, err := etcd.New(etcd.Config{
Endpoints: []string{"127.0.0.1:2379"},
})
if err != nil {
t.Fatal(err)
}
b, err := ioutil.ReadFile("./cert/server.crt")
if err != nil {
t.Fatal(err)
}
cp := x509.NewCertPool()
if !cp.AppendCertsFromPEM(b) {
t.Fatal(err)
}
cert, err := tls.LoadX509KeyPair("./cert/server.crt", "./cert/server.key")
if err != nil {
t.Fatal(err)
}
tlsConf := &tls.Config{
ServerName: "www.kratos.com",
RootCAs: cp,
Certificates: []tls.Certificate{cert},
}
r := etcdregistry.New(client)
srv, err := startServer(r, nil)
if err != nil {
t.Fatal(err)
}
srvTLS, err := startServer(r, tlsConf)
if err != nil {
t.Fatal(err)
}
callHTTP(t, r, tlsConf)
callGRPC(t, r, tlsConf)
srv.Stop()
srvTLS.Stop()
}

@ -5,6 +5,7 @@ import (
"strconv"
)
// NewEndpoint new an Endpoint URL.
func NewEndpoint(scheme, host string, isSecure bool) *url.URL {
var query string
if isSecure {
@ -13,8 +14,25 @@ func NewEndpoint(scheme, host string, isSecure bool) *url.URL {
return &url.URL{Scheme: scheme, Host: host, RawQuery: query}
}
func IsSecure(url *url.URL) bool {
ok, err := strconv.ParseBool(url.Query().Get("isSecure"))
// ParseEndpoint parses an Endpoint URL.
func ParseEndpoint(endpoints []string, scheme string, isSecure bool) (string, error) {
for _, e := range endpoints {
u, err := url.Parse(e)
if err != nil {
return "", err
}
if u.Scheme == scheme {
if IsSecure(u) == isSecure {
return u.Host, nil
}
}
}
return "", nil
}
// IsSecure parses isSecure for Endpoint URL.
func IsSecure(u *url.URL) bool {
ok, err := strconv.ParseBool(u.Query().Get("isSecure"))
if err != nil {
return false
}

@ -4,10 +4,9 @@ import (
"context"
"encoding/json"
"errors"
"github.com/go-kratos/kratos/v2/internal/endpoint"
"net/url"
"time"
"github.com/go-kratos/kratos/v2/internal/endpoint"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry"
"google.golang.org/grpc/attributes"
@ -48,7 +47,7 @@ func (r *discoveryResolver) watch() {
func (r *discoveryResolver) update(ins []*registry.ServiceInstance) {
var addrs []resolver.Address
for _, in := range ins {
endpoint, err := parseEndpoint(in.Endpoints, r.insecure)
endpoint, err := endpoint.ParseEndpoint(in.Endpoints, "grpc", !r.insecure)
if err != nil {
r.log.Errorf("[resovler] Failed to parse discovery endpoint: %v", err)
continue
@ -79,23 +78,6 @@ func (r *discoveryResolver) Close() {
func (r *discoveryResolver) ResolveNow(options resolver.ResolveNowOptions) {}
func parseEndpoint(endpoints []string, insecure bool) (string, error) {
for _, e := range endpoints {
u, err := url.Parse(e)
if err != nil {
return "", err
}
if u.Scheme == "grpc" {
if endpoint.IsSecure(u) != insecure {
return u.Host, nil
}
}
}
return "", nil
}
func parseAttributes(md map[string]string) *attributes.Attributes {
pairs := make([]interface{}, 0, len(md))
for k, v := range md {

@ -3,12 +3,13 @@ package grpc
import (
"context"
"crypto/tls"
"github.com/go-kratos/kratos/v2/internal/endpoint"
"net"
"net/url"
"sync"
"time"
"github.com/go-kratos/kratos/v2/internal/endpoint"
apimd "github.com/go-kratos/kratos/v2/api/metadata"
ic "github.com/go-kratos/kratos/v2/internal/context"
"github.com/go-kratos/kratos/v2/internal/host"
@ -159,7 +160,6 @@ func (s *Server) Endpoint() (*url.URL, error) {
return
}
s.lis = lis
s.endpoint = endpoint.NewEndpoint("grpc", addr, s.tlsConf != nil)
})
if s.err != nil {

@ -12,6 +12,7 @@ import (
"github.com/go-kratos/kratos/v2/encoding"
"github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/internal/endpoint"
"github.com/go-kratos/kratos/v2/internal/host"
"github.com/go-kratos/kratos/v2/internal/httputil"
"github.com/go-kratos/kratos/v2/middleware"
@ -138,10 +139,11 @@ func WithTLSConfig(c *tls.Config) ClientOption {
// Client is an HTTP client.
type Client struct {
opts clientOptions
target *Target
r *resolver
cc *http.Client
opts clientOptions
target *Target
r *resolver
cc *http.Client
insecure bool
}
// NewClient returns an HTTP client.
@ -163,18 +165,15 @@ func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
tr.TLSClientConfig = options.tlsConf
}
}
var isSecure bool
if options.tlsConf != nil {
isSecure = true
}
target, err := parseTarget(options.endpoint, isSecure)
insecure := options.tlsConf == nil
target, err := parseTarget(options.endpoint, insecure)
if err != nil {
return nil, err
}
var r *resolver
if options.discovery != nil {
if target.Scheme == "discovery" {
if r, err = newResolver(ctx, options.discovery, target, options.balancer, options.block, !isSecure); err != nil {
if r, err = newResolver(ctx, options.discovery, target, options.balancer, options.block, insecure); err != nil {
return nil, fmt.Errorf("[http client] new resolver failed!err: %v", options.endpoint)
}
} else if _, _, err := host.ExtractHostPort(options.endpoint); err != nil {
@ -182,9 +181,10 @@ func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
}
}
return &Client{
opts: options,
target: target,
r: r,
opts: options,
target: target,
insecure: insecure,
r: r,
cc: &http.Client{
Timeout: options.timeout,
Transport: options.transport,
@ -244,13 +244,17 @@ func (client *Client) invoke(ctx context.Context, req *http.Request, args interf
if node, done, err = client.opts.balancer.Pick(ctx); err != nil {
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error())
}
scheme, addr, err := parseEndpoint(node.Endpoints, client.opts.tlsConf == nil)
endpoint, err := endpoint.ParseEndpoint(node.Endpoints, "http", !client.insecure)
if err != nil {
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error())
}
req.URL.Scheme = scheme
req.URL.Host = addr
req.Host = addr
if client.insecure {
req.URL.Scheme = "http"
} else {
req.URL.Scheme = "https"
}
req.URL.Host = endpoint
req.Host = endpoint
}
res, err := client.do(ctx, req, c)
if done != nil {

@ -3,12 +3,12 @@ package http
import (
"context"
"errors"
"github.com/go-kratos/kratos/v2/internal/endpoint"
"net/url"
"strings"
"sync"
"time"
"github.com/go-kratos/kratos/v2/internal/endpoint"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry"
)
@ -25,12 +25,12 @@ type Target struct {
Endpoint string
}
func parseTarget(endpoint string, isSecure bool) (*Target, error) {
func parseTarget(endpoint string, insecure bool) (*Target, error) {
if !strings.Contains(endpoint, "://") {
if isSecure {
endpoint = "https://" + endpoint
} else {
if insecure {
endpoint = "http://" + endpoint
} else {
endpoint = "https://" + endpoint
}
}
u, err := url.Parse(endpoint)
@ -96,7 +96,6 @@ func newResolver(ctx context.Context, discovery registry.Discovery, target *Targ
return nil, ctx.Err()
}
}
go func() {
for {
services, err := watcher.Next()
@ -111,14 +110,13 @@ func newResolver(ctx context.Context, discovery registry.Discovery, target *Targ
r.update(services)
}
}()
return r, nil
}
func (r *resolver) update(services []*registry.ServiceInstance) {
var nodes []*registry.ServiceInstance
for _, in := range services {
_, endpoint, err := parseEndpoint(in.Endpoints, r.insecure)
endpoint, err := endpoint.ParseEndpoint(in.Endpoints, "http", !r.insecure)
if err != nil {
r.logger.Errorf("Failed to parse (%v) discovery endpoint: %v error %v", r.target, in.Endpoints, err)
continue
@ -141,23 +139,3 @@ func (r *resolver) update(services []*registry.ServiceInstance) {
func (r *resolver) Close() error {
return r.watcher.Stop()
}
func parseEndpoint(endpoints []string, insecure bool) (string, string, error) {
for _, e := range endpoints {
u, err := url.Parse(e)
if err != nil {
return "", "", err
}
if u.Scheme == "http" {
isSecure := endpoint.IsSecure(u)
scheme := "http"
if isSecure {
scheme = "https"
}
if isSecure != insecure {
return scheme, u.Host, nil
}
}
}
return "", "", nil
}

@ -7,15 +7,15 @@ import (
)
func TestParseTarget(t *testing.T) {
target, err := parseTarget("localhost:8000", false)
target, err := parseTarget("localhost:8000", true)
assert.Nil(t, err)
assert.Equal(t, &Target{Scheme: "http", Authority: "localhost:8000"}, target)
target, err = parseTarget("discovery:///demo", false)
target, err = parseTarget("discovery:///demo", true)
assert.Nil(t, err)
assert.Equal(t, &Target{Scheme: "discovery", Authority: "", Endpoint: "demo"}, target)
target, err = parseTarget("127.0.0.1:8000", false)
target, err = parseTarget("127.0.0.1:8000", true)
assert.Nil(t, err)
assert.Equal(t, &Target{Scheme: "http", Authority: "127.0.0.1:8000"}, target)
@ -23,7 +23,7 @@ func TestParseTarget(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, &Target{Scheme: "https", Authority: "127.0.0.1:8000"}, target)
target, err = parseTarget("127.0.0.1:8000", true)
target, err = parseTarget("127.0.0.1:8000", false)
assert.Nil(t, err)
assert.Equal(t, &Target{Scheme: "https", Authority: "127.0.0.1:8000"}, target)
}

Loading…
Cancel
Save