fix(grpc/resolver): fix builder context (#1258)

* fix grpc resovler ctx


* fix client close

* upgrade etcd to v3.5.0

* clean code
pull/1261/head
Tony Chen 3 years ago committed by GitHub
parent 025ae38acc
commit 7773d15256
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      examples/go.mod
  2. 25
      examples/go.sum
  3. 13
      examples/registry/consul/client/main.go
  4. 14
      examples/registry/consul/server/main.go
  5. 10
      examples/registry/etcd/client/main.go
  6. 15
      examples/registry/etcd/server/main.go
  7. 4
      examples/registry/nacos/client/main.go
  8. 25
      examples/registry/nacos/server/main.go
  9. 18
      examples/registry/zookeeper/client/main.go
  10. 12
      examples/registry/zookeeper/server/main.go
  11. 21
      transport/grpc/resolver/discovery/builder.go
  12. 14
      transport/grpc/resolver/discovery/resolver.go
  13. 5
      transport/http/client.go
  14. 8
      transport/http/resolver.go

@ -27,11 +27,11 @@ require (
github.com/longXboy/grpc-gateway/v2 v2.0.0-20210707031540-bd2d73d86cee
github.com/nacos-group/nacos-sdk-go v1.0.7
github.com/nicksnyder/go-i18n/v2 v2.1.2
github.com/prometheus/client_golang v1.9.0
github.com/prometheus/client_golang v1.11.0
github.com/segmentio/kafka-go v0.4.17
github.com/sirupsen/logrus v1.8.1
github.com/stretchr/testify v1.7.0
go.etcd.io/etcd/client/v3 v3.5.0-beta.4
go.etcd.io/etcd/client/v3 v3.5.0
go.opentelemetry.io/otel v1.0.0-RC1
go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC1
go.opentelemetry.io/otel/sdk v1.0.0-RC1

@ -171,6 +171,7 @@ github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2
github.com/go-kit/kit v0.8.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.9.0/go.mod h1:xBxKIO96dXMWWy0MnWVtmwkA9/13aqxPnvrjFYMA2as=
github.com/go-kit/kit v0.10.0/go.mod h1:xUsJbQ/Fp4kEt7AFgCuvyX4a71u8h9jB8tj/ORgOZ7o=
github.com/go-kit/log v0.1.0/go.mod h1:zbhenjAZHb184qTLMA9ZjW7ThYL0H2mk7Q6pNt4vbaY=
github.com/go-kratos/consul v0.1.2 h1:vryYF2o3wWQzFyzqKKKc1+GX/FY9T7DSkM3nc/Hy6/s=
github.com/go-kratos/consul v0.1.2/go.mod h1:FhxdCfkqaqIYfgpGbzGb8ywM/zV3jk+EBS+WSy1FzQQ=
github.com/go-kratos/etcd v0.1.0 h1:ewAfloZcpcg2L1QB1VLXYUMdvfUU0i3kAlToHrxTv2g=
@ -555,8 +556,9 @@ github.com/prometheus/client_golang v1.3.0/go.mod h1:hJaj2vgQTGQmVCsAACORcieXFeD
github.com/prometheus/client_golang v1.4.0/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.5.1/go.mod h1:e9GMxYsXl05ICDXkRhurwBS4Q3OK1iX/F2sw+iXX5zU=
github.com/prometheus/client_golang v1.7.1/go.mod h1:PY5Wy2awLA44sXw4AOSfFBetzPP4j5+D6mVACh+pe2M=
github.com/prometheus/client_golang v1.9.0 h1:Rrch9mh17XcxvEu9D9DEpb4isxjGBtcevQjKvxPRQIU=
github.com/prometheus/client_golang v1.9.0/go.mod h1:FqZLKOZnGdFAhOK4nqGHa7D66IdsO+O441Eve7ptJDU=
github.com/prometheus/client_golang v1.11.0 h1:HNkLOAEQMIDv/K+04rukrLx6ch7msSRwf3/SASFAGtQ=
github.com/prometheus/client_golang v1.11.0/go.mod h1:Z6t4BnS23TR94PD6BsDNk8yVqroYurpAkEiz0P2BEV0=
github.com/prometheus/client_model v0.0.0-20180712105110-5c3871d89910/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190115171406-56726106282f/go.mod h1:MbSGuTsp3dbXC40dX6PRTWyKYBIrTGTE9sqQNg2J8bo=
github.com/prometheus/client_model v0.0.0-20190129233127-fd36f4220a90/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
@ -571,16 +573,18 @@ github.com/prometheus/common v0.4.1/go.mod h1:TNfzLD0ON7rHzMJeJkieUDPYmFC7Snx/y8
github.com/prometheus/common v0.7.0/go.mod h1:DjGbpBbp5NYNiECxcL/VnbXCCaQpKd3tt26CguLLsqA=
github.com/prometheus/common v0.9.1/go.mod h1:yhUN8i9wzaXS3w1O07YhxHEBxD+W35wd8bs7vj7HSQ4=
github.com/prometheus/common v0.10.0/go.mod h1:Tlit/dnDKsSWFlCLTWaA1cyBgKHSMdTB80sz/V91rCo=
github.com/prometheus/common v0.15.0 h1:4fgOnadei3EZvgRwxJ7RMpG1k1pOZth5Pc13tyspaKM=
github.com/prometheus/common v0.15.0/go.mod h1:U+gB1OBLb1lF3O42bTCL+FK18tX9Oar16Clt/msog/s=
github.com/prometheus/common v0.26.0 h1:iMAkS2TDoNWnKM+Kopnx/8tnEStIfpYA0ur0xQzzhMQ=
github.com/prometheus/common v0.26.0/go.mod h1:M7rCNAaPfAosfx8veZJCuw84e35h3Cfd9VFqTh1DIvc=
github.com/prometheus/procfs v0.0.0-20181005140218-185b4288413d/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190117184657-bf6a532e95b1/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk=
github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.2/go.mod h1:TjEm7ze935MbeOT/UhFTIMYKhuLP4wbCsTZCD3I8kEA=
github.com/prometheus/procfs v0.0.8/go.mod h1:7Qr8sr6344vo1JqZ6HhLceV9o3AJ1Ff+GxbHq6oeK9A=
github.com/prometheus/procfs v0.1.3/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.2.0 h1:wH4vA7pcjKuZzjF7lM8awk4fnuJO6idemZXoKnULUx4=
github.com/prometheus/procfs v0.2.0/go.mod h1:lV6e/gmhEcM9IjHGsFOCxxuZ+z1YqCvr4OA4YeYWdaU=
github.com/prometheus/procfs v0.6.0 h1:mxy4L2jP6qMonqmq+aTtOx1ifVWUgG/TAmntgbh3xv4=
github.com/prometheus/procfs v0.6.0/go.mod h1:cz+aTbrPOrUb4q7XlbU9ygM+/jj0fzG6c1xBZuNvfVA=
github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU=
github.com/rakyll/statik v0.1.7 h1:OF3QCZUuyPxuGEP7B4ypUa7sB/iHtqOTDYZXGM8KOdQ=
github.com/rakyll/statik v0.1.7/go.mod h1:AlZONWzMtEnMs7W4e/1LURLiI49pIMmp6V9Unghqrcc=
@ -671,13 +675,16 @@ go.etcd.io/bbolt v1.3.3/go.mod h1:IbVyRI1SCnLcuJnV2u8VeU0CEYM7e686BmAb1XKL+uU=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738 h1:VcrIfasaLFkyjk6KNlXQSzO+B0fZcnECiDrKJsfxka0=
go.etcd.io/etcd v0.0.0-20191023171146-3cf2f69b5738/go.mod h1:dnLIgRNXwCJa5e+c6mIZCrds/GIG4ncV9HhK5PX7jPg=
go.etcd.io/etcd/api/v3 v3.5.0-alpha.0/go.mod h1:mPcW6aZJukV6Aa81LSKpBjQXTWlXB5r74ymPoSWa3Sw=
go.etcd.io/etcd/api/v3 v3.5.0-beta.4 h1:etIejKeELg3fIXt0i71TXtx1OjK9q+oegcv00zipiis=
go.etcd.io/etcd/api/v3 v3.5.0-beta.4/go.mod h1:yF0YUmBghT48aC0/eTFrhULo+uKQAr5spQQ6sRhPauE=
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4 h1:IVvCfkch8truS86wSy67AbnXCYq8nYpM8NPTW14Ttp0=
go.etcd.io/etcd/api/v3 v3.5.0 h1:GsV3S+OfZEOCNXdtNkBSR7kgLobAa/SO6tCxRa0GAYw=
go.etcd.io/etcd/api/v3 v3.5.0/go.mod h1:cbVKeC6lCfl7j/8jBhAK6aIYO9XOjdptoxU/nLQcPvs=
go.etcd.io/etcd/client/pkg/v3 v3.5.0-beta.4/go.mod h1:a+pbz+UrcOpvve1Qxf6tGovi15PjgtRhi0QTO2Nlc4U=
go.etcd.io/etcd/client/pkg/v3 v3.5.0 h1:2aQv6F436YnN7I4VbI8PPYrBhu+SmrTaADcf8Mi/6PU=
go.etcd.io/etcd/client/pkg/v3 v3.5.0/go.mod h1:IJHfcCEKxYu1Os13ZdwCwIUTUVGYTSAM3YSwc9/Ac1g=
go.etcd.io/etcd/client/v3 v3.5.0-alpha.0/go.mod h1:wKt7jgDgf/OfKiYmCq5WFGxOFAkVMLxiiXgLDFhECr8=
go.etcd.io/etcd/client/v3 v3.5.0-beta.4 h1:AW/Sj3Oq7ZYjgNsYU0xad/tu3BH/Jnn2OuLvQoUNMEM=
go.etcd.io/etcd/client/v3 v3.5.0-beta.4/go.mod h1:0L1RulN1QSXq6uKPMUSX+OTAYyFkapMK7iUHXXIH/1E=
go.etcd.io/etcd/client/v3 v3.5.0 h1:62Eh0XOro+rDwkrypAGDfgmNh5Joq+z+W9HZdlXMzek=
go.etcd.io/etcd/client/v3 v3.5.0/go.mod h1:AIKXXVX/DQXtfTEqBryiLTUXwON+GuvO6Z7lLS/oTh0=
go.etcd.io/etcd/pkg/v3 v3.5.0-alpha.0/go.mod h1:tV31atvwzcybuqejDoY3oaNRTtlD2l/Ot78Pc9w7DMY=
go.opencensus.io v0.20.1/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
go.opencensus.io v0.20.2/go.mod h1:6WKK9ahsWS3RSO+PY9ZHZUfv2irvY6gN279GOPZjmmk=
@ -732,6 +739,7 @@ go.uber.org/zap v1.13.0/go.mod h1:zwrFLgMcdUuIBviXEYEH1YKNaOBnKXsx2IPda5bBwHM=
go.uber.org/zap v1.15.0/go.mod h1:Mb2vm2krFEG5DV0W9qcHBYFtp/Wku1cvYaqPsS/WYfc=
go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ=
go.uber.org/zap v1.16.1-0.20210329175301-c23abee72d19/go.mod h1:aMfIlz3TDBfB0BwTCKFU1XbEmj9zevr5S5LcBr85MXw=
go.uber.org/zap v1.17.0/go.mod h1:MXVU+bhUf/A7Xi2HNOnopQOrmycQ5Ih87HtOu4q5SSo=
go.uber.org/zap v1.18.1 h1:CSUJ2mjFszzEWt4CdKISEuChVIXGBn3lAPwkRGyVrc4=
go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI=
golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
@ -840,6 +848,7 @@ golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@ -898,10 +907,12 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20201214210602-f9fddec55a1e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210112080510-489259a85091/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210119212857-b64e53b001e4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4=
golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
@ -974,6 +985,7 @@ golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc
golang.org/x/tools v0.0.0-20201224043029-2b0845dc783e/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4 h1:cVngSRcfgyZCzys3KYOpCFa+4dqX/Oub9tAq00ttGVs=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -1036,6 +1048,7 @@ google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7Fc
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20210629200056-84d6f6074151/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U=
google.golang.org/genproto v0.0.0-20210701191553-46259e63a0a9 h1:HBPuvo39L0DgfVn9eHR3ki/RjZoUFWa+em77e7KFDfs=
google.golang.org/genproto v0.0.0-20210701191553-46259e63a0a9/go.mod h1:yiaVoXHpRzHGyxV3o4DktVWY4mSUErTKaeEOq6C3t3U=

@ -18,8 +18,11 @@ func main() {
if err != nil {
panic(err)
}
callHTTP(client)
callGRPC(client)
for {
callHTTP(client)
callGRPC(client)
time.Sleep(time.Second)
}
}
func callGRPC(cli *api.Client) {
@ -32,8 +35,9 @@ func callGRPC(cli *api.Client) {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos_grpc"})
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}
@ -53,9 +57,10 @@ func callHTTP(cli *api.Client) {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
time.Sleep(time.Millisecond * 250)
client := helloworld.NewGreeterHTTPClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos_http"})
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}

@ -29,25 +29,25 @@ func main() {
if err != nil {
log.Fatal(err)
}
r := registry.New(consulClient)
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
recovery.Recovery(),
),
)
httpSrv := http.NewServer(
http.Address(":8000"),
http.Middleware(
recovery.Recovery(),
),
)
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
recovery.Recovery(),
),
)
s := &server{}
helloworld.RegisterGreeterServer(grpcSrv, s)
helloworld.RegisterGreeterHTTPServer(httpSrv, s)
r := registry.New(consulClient)
app := kratos.New(
kratos.Name("helloworld"),
kratos.Server(

@ -3,6 +3,7 @@ package main
import (
"context"
"log"
"time"
"github.com/go-kratos/etcd/registry"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
@ -19,8 +20,11 @@ func main() {
panic(err)
}
r := registry.New(cli)
callGRPC(r)
callHTTP(r)
for {
callHTTP(r)
callGRPC(r)
time.Sleep(time.Second)
}
}
func callGRPC(r *registry.Registry) {
@ -32,6 +36,7 @@ func callGRPC(r *registry.Registry) {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
@ -50,6 +55,7 @@ func callHTTP(r *registry.Registry) {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterHTTPClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {

@ -31,25 +31,30 @@ func main() {
if err != nil {
log.Fatal(err)
}
r := registry.New(client)
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
)
httpSrv := http.NewServer(
http.Address(":8000"),
http.Middleware(
recovery.Recovery(),
),
)
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
recovery.Recovery(),
),
)
s := &server{}
pb.RegisterGreeterServer(grpcSrv, s)
pb.RegisterGreeterHTTPServer(httpSrv, s)
r := registry.New(client)
app := kratos.New(
kratos.Name("helloworld"),
kratos.Server(
grpcSrv,
httpSrv,
grpcSrv,
),
kratos.Registrar(r),
)

@ -18,7 +18,7 @@ func main() {
}
cc := &constant.ClientConfig{
NamespaceId: "public", //namespace id
NamespaceId: "public",
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
@ -28,7 +28,6 @@ func main() {
LogLevel: "debug",
}
// a more graceful way to create naming client
cli, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: cc,
@ -47,6 +46,7 @@ func main() {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})

@ -7,7 +7,9 @@ import (
pb "github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
"github.com/go-kratos/nacos/registry"
"github.com/nacos-group/nacos-sdk-go/clients"
"github.com/nacos-group/nacos-sdk-go/common/constant"
@ -30,7 +32,7 @@ func main() {
}
cc := constant.ClientConfig{
NamespaceId: "public", //namespace id
NamespaceId: "public",
TimeoutMs: 5000,
NotLoadCacheAtStart: true,
LogDir: "/tmp/nacos/log",
@ -40,7 +42,6 @@ func main() {
LogLevel: "debug",
}
// a more graceful way to create naming client
client, err := clients.NewNamingClient(
vo.NacosClientParam{
ClientConfig: &cc,
@ -51,17 +52,31 @@ func main() {
log.Panic(err)
}
httpSrv := http.NewServer(
http.Address(":8000"),
http.Middleware(
recovery.Recovery(),
),
)
grpcSrv := grpc.NewServer(
grpc.Address(":9109"),
grpc.Address(":9000"),
grpc.Middleware(
recovery.Recovery(),
),
)
s := &server{}
pb.RegisterGreeterServer(grpcSrv, s)
pb.RegisterGreeterHTTPServer(httpSrv, s)
r := registry.New(client)
app := kratos.New(
kratos.Name("helloworld"),
kratos.Server(grpcSrv),
kratos.Registrar(registry.New(client)),
kratos.Server(
httpSrv,
grpcSrv,
),
kratos.Registrar(r),
)
if err := app.Run(); err != nil {
log.Fatal(err)

@ -3,9 +3,9 @@ package main
import (
"context"
"log"
"time"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
"github.com/go-kratos/zookeeper/registry"
@ -16,8 +16,11 @@ func main() {
if err != nil {
panic(err)
}
callHTTP(r)
callGRPC(r)
for {
callHTTP(r)
callGRPC(r)
time.Sleep(time.Second)
}
}
func callGRPC(r *registry.Registry) {
@ -29,8 +32,9 @@ func callGRPC(r *registry.Registry) {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos_grpc"})
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}
@ -40,9 +44,6 @@ func callGRPC(r *registry.Registry) {
func callHTTP(r *registry.Registry) {
conn, err := http.NewClient(
context.Background(),
http.WithMiddleware(
recovery.Recovery(),
),
http.WithEndpoint("discovery:///helloworld"),
http.WithDiscovery(r),
http.WithBlock(),
@ -50,8 +51,9 @@ func callHTTP(r *registry.Registry) {
if err != nil {
log.Fatal(err)
}
defer conn.Close()
client := helloworld.NewGreeterHTTPClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos_http"})
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}

@ -29,18 +29,18 @@ func main() {
log.Fatal(err)
}
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
recovery.Recovery(),
),
)
httpSrv := http.NewServer(
http.Address(":8000"),
http.Middleware(
recovery.Recovery(),
),
)
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
recovery.Recovery(),
),
)
s := &server{}
helloworld.RegisterGreeterServer(grpcSrv, s)

@ -48,13 +48,24 @@ func NewBuilder(d registry.Discovery, opts ...Option) resolver.Builder {
}
func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts resolver.BuildOptions) (resolver.Resolver, error) {
ctx, cancel := context.WithTimeout(context.Background(), b.timeout)
defer cancel()
w, err := b.discoverer.Watch(ctx, target.Endpoint)
var (
err error
w registry.Watcher
)
done := make(chan bool, 1)
ctx, cancel := context.WithCancel(context.Background())
go func() {
w, err = b.discoverer.Watch(ctx, target.Endpoint)
close(done)
}()
select {
case <-done:
case <-time.After(b.timeout):
}
if err != nil {
cancel()
return nil, err
}
r := &discoveryResolver{
w: w,
cc: cc,
@ -62,9 +73,7 @@ func (b *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res
cancel: cancel,
log: log.NewHelper(b.logger),
}
r.ctx, r.cancel = context.WithCancel(context.Background())
go r.watch()
return r, nil
}

@ -2,6 +2,8 @@ package discovery
import (
"context"
"encoding/json"
"errors"
"net/url"
"time"
@ -27,10 +29,12 @@ func (r *discoveryResolver) watch() {
return
default:
}
ins, err := r.w.Next()
if err != nil {
r.log.Errorf("Failed to watch discovery endpoint: %v", err)
if errors.Is(err, context.Canceled) {
return
}
r.log.Errorf("[resovler] Failed to watch discovery endpoint: %v", err)
time.Sleep(time.Second)
continue
}
@ -43,7 +47,7 @@ func (r *discoveryResolver) update(ins []*registry.ServiceInstance) {
for _, in := range ins {
endpoint, err := parseEndpoint(in.Endpoints)
if err != nil {
r.log.Errorf("Failed to parse discovery endpoint: %v", err)
r.log.Errorf("[resovler] Failed to parse discovery endpoint: %v", err)
continue
}
if endpoint == "" {
@ -57,10 +61,12 @@ func (r *discoveryResolver) update(ins []*registry.ServiceInstance) {
addrs = append(addrs, addr)
}
if len(addrs) == 0 {
r.log.Warnf("[resovler]Zero endpoint found,refused to write, ins: %v", ins)
r.log.Warnf("[resovler] Zero endpoint found,refused to write, instances: %v", ins)
return
}
r.cc.UpdateState(resolver.State{Addresses: addrs})
b, _ := json.Marshal(ins)
r.log.Infof("[resolver] update instances: %s", b)
}
func (r *discoveryResolver) Close() {

@ -283,6 +283,11 @@ func (client *Client) do(ctx context.Context, req *http.Request, c callInfo) (*h
return resp, nil
}
// Close tears down the Transport and all underlying connections.
func (client *Client) Close() error {
return client.r.Close()
}
// DefaultRequestEncoder is an HTTP request encoder.
func DefaultRequestEncoder(ctx context.Context, contentType string, in interface{}) ([]byte, error) {
name := httputil.ContentSubtype(contentType)

@ -2,6 +2,7 @@ package http
import (
"context"
"errors"
"net/url"
"sync"
"time"
@ -90,6 +91,9 @@ func newResolver(ctx context.Context, discovery registry.Discovery, target *Targ
for {
services, err := watcher.Next()
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
r.logger.Errorf("http client watch service %v got unexpected error:=%v", target, err)
time.Sleep(time.Second)
continue
@ -124,6 +128,10 @@ func (r *resolver) update(services []*registry.ServiceInstance) {
}
}
func (r *resolver) Close() error {
return r.watcher.Stop()
}
func parseEndpoint(endpoints []string) (string, string, error) {
for _, e := range endpoints {
u, err := url.Parse(e)

Loading…
Cancel
Save