Merge pull request #3 from bilibili/master

merge
pull/290/head
wuxingzhong 6 years ago committed by GitHub
commit 265707dec1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .gitignore
  2. 92
      doc/wiki-cn/blademaster-mid.md
  3. 1
      doc/wiki-cn/blademaster-mod.md
  4. 2
      doc/wiki-cn/cache-redis.md
  5. 81
      doc/wiki-cn/database-mysql-orm.md
  6. 3
      doc/wiki-cn/database.md
  7. 2
      doc/wiki-cn/kratos-genbts.md
  8. 37
      doc/wiki-cn/kratos-protoc.md
  9. 20
      doc/wiki-cn/ratelimit.md
  10. 1
      doc/wiki-cn/summary.md
  11. 6
      doc/wiki-cn/warden-quickstart.md
  12. 0
      example/blademaster/middleware/auth/README.md
  13. 0
      example/blademaster/middleware/auth/auth.go
  14. 2
      example/blademaster/middleware/auth/example_test.go
  15. 40
      example/protobuf/api.bm.go
  16. 17
      example/protobuf/api.ecode.go
  17. 1000
      example/protobuf/api.pb.go
  18. 33
      example/protobuf/api.proto
  19. 96
      example/protobuf/api.swagger.json
  20. 3
      example/protobuf/gen.sh
  21. 1
      pkg/database/sql/mysql.go
  22. 75
      pkg/database/sql/sql.go
  23. 6
      pkg/ecode/ecode.go
  24. 6
      pkg/ecode/status.go
  25. 3
      pkg/ecode/status_test.go
  26. 18
      pkg/naming/discovery/discovery.go
  27. 2
      pkg/net/http/blademaster/routergroup.go
  28. 4
      pkg/net/http/blademaster/server.go
  29. 3
      pkg/net/rpc/warden/CHANGELOG.md
  30. 19
      pkg/net/rpc/warden/client.go
  31. 11
      pkg/net/rpc/warden/resolver/resolver.go
  32. 10
      pkg/net/rpc/warden/server_test.go
  33. 8
      pkg/stat/metric/metric.go
  34. 1
      pkg/stat/sys/cpu/cgroupCPU.go
  35. 2
      tool/kratos-protoc/bm.go
  36. 25
      tool/kratos-protoc/ecode.go
  37. 5
      tool/kratos-protoc/main.go
  38. 12
      tool/kratos-protoc/protoc.go
  39. 2
      tool/kratos-protoc/swagger.go
  40. 2
      tool/kratos/template.go
  41. 2
      tool/kratos/version.go
  42. 11
      tool/protobuf/pkg/naming/naming.go
  43. 28
      tool/protobuf/protoc-gen-bm/generator/generator.go
  44. 2
      tool/protobuf/protoc-gen-bswagger/generator.go
  45. 117
      tool/protobuf/protoc-gen-ecode/generator/generator.go
  46. 27
      tool/protobuf/protoc-gen-ecode/generator/generator_test.go
  47. 23
      tool/protobuf/protoc-gen-ecode/main.go

1
.gitignore vendored

@ -28,4 +28,5 @@ tool/kratos-gen-bts/kratos-gen-bts
tool/kratos-gen-mc/kratos-gen-mc
tool/kratos/kratos-protoc/kratos-protoc
tool/kratos/protobuf/protoc-gen-bm/protoc-gen-bm
tool/kratos/protobuf/protoc-gen-ecode/protoc-gen-ecode
tool/kratos/protobuf/protoc-gen-bswagger/protoc-gen-bswagger

@ -4,7 +4,8 @@
# 写自己的中间件
middleware本质上就是一个handler,如下代码:
middleware本质上就是一个handler,接口和方法声明如下代码:
```go
// Handler responds to an HTTP request.
type Handler interface {
@ -21,7 +22,7 @@ func (f HandlerFunc) ServeHTTP(c *Context) {
```
1. 实现了`Handler`接口,可以作为engine的全局中间件使用:`engine.Use(YourHandler)`
2. 声明为`HandlerFunc`方法,可以作为router的局部中间件使用:`e.GET("/path", YourHandlerFunc)`
2. 声明为`HandlerFunc`方法,可以作为engine的全局中间件使用:`engine.UseFunc(YourHandlerFunc)`,也可以作为router的局部中间件使用:`e.GET("/path", YourHandlerFunc)`
简单示例代码如下:
@ -42,16 +43,17 @@ d := &Demo{}
e.Use(d)
// HandlerFunc使用如下:
e.UseFunc(d.ServeHTTP)
e.GET("/path", d.ServeHTTP)
// 或者只有方法
myHandler := func(ctx *bm.Context) {
// some code
}
e.UseFunc(myHandler)
e.GET("/path", myHandler)
```
# 全局中间件
在blademaster的`server.go`代码中,有以下代码:
@ -64,19 +66,17 @@ func DefaultServer(conf *ServerConfig) *Engine {
}
```
会默认创建一个bm engine,并注册`Recovery(), Trace(), Logger()`三个middlerware,用于全局handler处理。优先级从前到后。
如果需要自定义默认全局执行的middleware,可以使用`NewServer`方法创建一个无middleware的engine对象。
如果想要将自定义的middleware注册进全局,可以继续调用Use方法如下:
会默认创建一个`bm engine`,并注册`Recovery(), Trace(), Logger()`三个middlerware用于全局handler处理,优先级从前到后。如果想要将自定义的middleware注册进全局,可以继续调用Use方法如下:
```go
engine.Use(YourMiddleware())
```
此方法会将`YourMiddleware`追加到已有的全局middleware后执行。
此方法会将`YourMiddleware`追加到已有的全局middleware后执行。如果需要全部自定义全局执行的middleware,可以使用`NewServer`方法创建一个无middleware的engine对象,然后使用`engine.Use/UseFunc`进行注册。
# 局部中间件
先来看一段示例(代码再pkg/net/http/blademaster/middleware/auth模块下):
先来看一段鉴权伪代码示例([auth示例代码位置](https://github.com/bilibili/kratos/tree/master/example/blademaster/middleware/auth)):
```go
func Example() {
@ -105,28 +105,68 @@ func Example() {
# 内置中间件
## Recovery
代码位于`pkg/net/http/blademaster/recovery.go`内,用于recovery panic。会被`DefaultServer`默认注册,建议使用`NewServer`的话也将其作为首个中间件注册。
## Trace
代码位于`pkg/net/http/blademaster/trace.go`内,用于trace设置,并且实现了`net/http/httptrace`的接口,能够收集官方库内的调用栈详情。会被`DefaultServer`默认注册,建议使用`NewServer`的话也将其作为第二个中间件注册。
## Logger
代码位于`pkg/net/http/blademaster/logger.go`内,用于请求日志记录。会被`DefaultServer`默认注册,建议使用`NewServer`的话也将其作为第三个中间件注册。
## CSRF
代码位于`pkg/net/http/blademaster/csrf.go`内,用于防跨站请求。如要使用如下:
```go
e := bm.DefaultServer(nil)
// 挂载自适应限流中间件到 bm engine,使用默认配置
csrf := bm.CSRF([]string{"bilibili.com"}, []string{"/a/api"})
e.Use(csrf)
// 或者
e.GET("/api", csrf, myHandler)
```
## CORS
代码位于`pkg/net/http/blademaster/cors.go`内,用于跨域允许请求。请注意该:
1. 使用该中间件进行全局注册后,可"省略"单独为`OPTIONS`请求注册路由,如示例一。
2. 使用该中间单独为某路由注册,需要为该路由再注册一个`OPTIONS`方法的同路径路由,如示例二。
示例一:
```go
e := bm.DefaultServer(nil)
// 挂载自适应限流中间件到 bm engine,使用默认配置
cors := bm.CORS([]string{"github.com"})
e.Use(cors)
// 该路由可以默认针对 OPTIONS /api 的跨域请求支持
e.POST("/api", myHandler)
```
示例二:
```go
e := bm.DefaultServer(nil)
// 挂载自适应限流中间件到 bm engine,使用默认配置
cors := bm.CORS([]string{"github.com"})
// e.Use(cors) 不进行全局注册
e.OPTIONS("/api", cors, myHandler) // 需要单独为/api进行OPTIONS方法注册
e.POST("/api", cors, myHandler)
```
## 自适应限流
更多关于自适应限流的信息,请参考:[kratos 自适应限流](/doc/wiki-cn/ratelimit.md)
更多关于自适应限流的信息参考:[kratos 自适应限流](/doc/wiki-cn/ratelimit.md)。如要使用如下:
```go
func Example() {
myHandler := func(ctx *bm.Context) {
mid := metadata.Int64(ctx, metadata.Mid)
ctx.JSON(fmt.Sprintf("%d", mid), nil)
}
e := bm.DefaultServer(nil)
// 挂载自适应限流中间件到 bm engine,使用默认配置
limiter := bm.NewRateLimiter(nil)
e.Use(limiter.Limit())
e.GET("/user", myHandler)
e.Start()
}
e := bm.DefaultServer(nil)
// 挂载自适应限流中间件到 bm engine,使用默认配置
limiter := bm.NewRateLimiter(nil)
e.Use(limiter.Limit())
// 或者
e.GET("/api", csrf, myHandler)
```
# 扩展阅读

@ -26,6 +26,7 @@ type Context struct {
```
* 首先可以看到 blademaster 的 Context 结构体中会 embed 一个标准库中的 Context 实例,bm 中的 Context 也是直接通过该实例来实现标准库中的 Context 接口。
* blademaster 会使用配置的 server timeout (默认1s) 作为一次请求整个过程中的超时时间,使用该context调用dao做数据库、缓存操作查询时均会将该超时时间传递下去,一旦抵达deadline,后续相关操作均会返回`context deadline exceeded`。
* Request 和 Writer 字段用于获取当前请求的与输出响应。
* index 和 handlers 用于 handler 的流程控制;handlers 中存储了当前请求需要执行的所有 handler,index 用于标记当前正在执行的 handler 的索引位。
* Keys 用于在 handler 之间传递一些额外的信息。

@ -180,7 +180,7 @@ func (d *Dao) DemoIncrbys(c context.Context, pid int) (err error) {
## 返回值转换
与[memcache包](cache-mc.md)类似地,kratos/pkg/cache/redis包中也提供了Scan方法将redis server的返回值转换为golang类型。
kratos/pkg/cache/redis包中也提供了Scan方法将redis server的返回值转换为golang类型。
除此之外,kratos/pkg/cache/redis包提供了大量返回值转换的快捷方式:

@ -0,0 +1,81 @@
# 准备工作
推荐使用[kratos工具](kratos-tool.md)快速生成项目,如我们生成一个叫`kratos-demo`的项目。目录结构如下:
```
├── CHANGELOG.md
├── CONTRIBUTORS.md
├── LICENSE
├── README.md
├── cmd
   ├── cmd
   └── main.go
├── configs
   ├── application.toml
   ├── grpc.toml
   ├── http.toml
   ├── log.toml
   ├── memcache.toml
   ├── mysql.toml
   └── redis.toml
├── go.mod
├── go.sum
└── internal
├── dao
   └── dao.go
├── model
   └── model.go
├── server
   └── http
   └── http.go
└── service
└── service.go
```
# 开始使用
## 配置
创建项目成功后,进入项目中的configs目录,mysql.toml,我们可以看到:
```toml
[demo]
addr = "127.0.0.1:3306"
dsn = "{user}:{password}@tcp(127.0.0.1:3306)/{database}?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8"
readDSN = ["{user}:{password}@tcp(127.0.0.2:3306)/{database}?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8mb4,utf8","{user}:{password}@tcp(127.0.0.3:3306)/{database}?timeout=1s&readTimeout=1s&writeTimeout=1s&parseTime=true&loc=Local&charset=utf8,utf8mb4"]
active = 20
idle = 10
idleTimeout ="4h"
queryTimeout = "200ms"
execTimeout = "300ms"
tranTimeout = "400ms"
```
在该配置文件中我们可以配置mysql的读和写的dsn、连接地址addr、连接池的闲置连接数idle、最大连接数active以及各类超时。
如果配置了readDSN,在进行读操作的时候会优先使用readDSN的连接。
## 初始化
进入项目的internal/dao目录,打开dao.go,其中:
```go
var (
dc struct {
Demo *sql.Config
}
)
checkErr(paladin.Get("mysql.toml").UnmarshalTOML(&dc))
```
使用paladin配置管理工具将上文中的mysql.toml中的配置解析为我们需要使用mysql的相关配置。
# TODO:补充常用方法
# 扩展阅读
[tidb模块说明](database-tidb.md)
[hbase模块说明](database-hbase.md)
-------------
[文档目录树](summary.md)

@ -6,7 +6,8 @@
## MySQL
MySQL数据库驱动,支持读写分离、context、timeout、trace和统计功能,以及错误熔断防止数据库雪崩。
[mysql client](database-mysql.md)
[mysql client](database-mysql.md)
[mysql client orm](database-mysql-orm.md)
## HBase
HBase客户端,支持trace、slowlog和统计功能。

@ -2,7 +2,7 @@
> 缓存回源代码生成
在internal/dao/dao.go中添加mc缓存interface定义,可以指定对应的[注解参数](../../tool/kratos-gen-mc/README.md);
在internal/dao/dao.go中添加mc缓存interface定义,可以指定对应的[注解参数](../../tool/kratos-gen-bts/README.md);
并且在接口前面添加`go:generate kratos tool genbts`;
然后在当前目录执行`go generate`,可以看到自动生成的dao.bts.go代码。

@ -1,38 +1,31 @@
### kratos tool protoc
```
// generate all
```shell
# generate all
kratos tool protoc api.proto
// generate gRPC
# generate gRPC
kratos tool protoc --grpc api.proto
// generate BM HTTP
# generate BM HTTP
kratos tool protoc --bm api.proto
// generate swagger
# generate ecode
kratos tool protoc --ecode api.proto
# generate swagger
kratos tool protoc --swagger api.proto
```
执行对应生成 `api.pb.go/api.bm.go/api.swagger.json` 源文档。
> 该工具在Windows/Linux下运行,需提前安装好 protobuf 工具
该工具实际是一段`shell`脚本,其中自动将`protoc`命令进行了拼接,识别了需要的`*.proto`文件和当前目录下的`proto`文件,最终会拼接为如下命令进行执行:
```shell
export $KRATOS_HOME = kratos路径
export $KRATOS_DEMO = 项目路径
执行生成如 `api.pb.go/api.bm.go/api.swagger.json/api.ecode.go` 的对应文件,需要注意的是:`ecode`生成有固定规则,需要首先是`enum`类型,且`enum`名字要以`ErrCode`结尾,如`enum UserErrCode`。详情可见:[example](https://github.com/bilibili/kratos/tree/master/example/protobuf)
// 生成:api.pb.go
protoc -I$GOPATH/src:$KRATOS_HOME/third_party:$KRATOS_DEMO/api --gofast_out=plugins=grpc:$KRATOS_DEMO/api $KRATOS_DEMO/api/api.proto
> 该工具在Windows/Linux下运行,需提前安装好 [protobuf](https://github.com/google/protobuf) 工具
// 生成:api.bm.go
protoc -I$GOPATH/src:$KRATOS_HOME/third_party:$KRATOS_DEMO/api --bm_out=$KRATOS_DEMO/api $KRATOS_DEMO/api/api.proto
`kratos tool protoc`本质上是拼接好了`protoc`命令然后进行执行,在执行时会打印出对应执行的`protoc`命令,如下可见:
// 生成:api.swagger.json
protoc -I$GOPATH/src:$KRATOS_HOME/third_party:$KRATOS_DEMO/api --bswagger_out=$KRATOS_DEMO/api $KRATOS_DEMO/api/api.proto
```shell
protoc --proto_path=$GOPATH --proto_path=$GOPATH/github.com/bilibili/kratos/third_party --proto_path=. --bm_out=:. api.proto
protoc --proto_path=$GOPATH --proto_path=$GOPATH/github.com/bilibili/kratos/third_party --proto_path=. --gofast_out=plugins=grpc:. api.proto
protoc --proto_path=$GOPATH --proto_path=$GOPATH/github.com/bilibili/kratos/third_party --proto_path=. --bswagger_out=:. api.proto
protoc --proto_path=$GOPATH --proto_path=$GOPATH/github.com/bilibili/kratos/third_party --proto_path=. --ecode_out=:. api.proto
```
大家也可以参考该命令进行`proto`生成,也可以参考 [protobuf](https://github.com/google/protobuf) 官方参数。
-------------
[文档目录树](summary.md)

@ -11,17 +11,17 @@ kratos 借鉴了 Sentinel 项目的自适应限流系统,通过综合分析服
## 限流规则
1,指标介绍
### 指标介绍
|指标名称|指标含义|
|---|---|
|cpu|最近 1s 的 CPU 使用率均值,使用滑动平均计算,采样周期是 250ms|
|inflight|当前处理中正在处理的请求数量|
|pass|请求处理成功的量|
|rt|请求成功的响应耗时|
| 指标名称 | 指标含义 |
| -------- | ------------------------------------------------------------- |
| cpu | 最近 1s 的 CPU 使用率均值,使用滑动平均计算,采样周期是 250ms |
| inflight | 当前处理中正在处理的请求数量 |
| pass | 请求处理成功的量 |
| rt | 请求成功的响应耗时 |
2,滑动窗口
### 滑动窗口
在自适应限流保护中,采集到的指标的时效性非常强,系统只需要采集最近一小段时间内的 qps、rt 即可,对于较老的数据,会自动丢弃。为了实现这个效果,kratos 使用了滑动窗口来保存采样数据。
@ -31,7 +31,7 @@ kratos 借鉴了 Sentinel 项目的自适应限流系统,通过综合分析服
当时间流动之后,过期的桶会自动被新桶的数据覆盖掉,在图中,在 1000-1500ms 时,bucket 1 的数据因为过期而被丢弃,之后 bucket 3 的数据填到了窗口的头部。
3,限流公式
### 限流公式
判断是否丢弃当前请求的算法如下:
@ -51,7 +51,7 @@ windows 表示一秒内采样窗口的数量,默认配置中是 5s 50 个采
可以看到,没有限流的场景里,系统在 700qps 时开始抖动,在 1k qps 时被拖垮,几乎没有新的请求能被放行,然而在使用限流之后,系统请求能够稳定在 600 qps 左右,rt 没有暴增,服务也没有被打垮,可见,限流有效的保护了服务。
参考资料
## 参考资料
[Sentinel 系统自适应限流](https://github.com/alibaba/Sentinel/wiki/%E7%B3%BB%E7%BB%9F%E8%87%AA%E9%80%82%E5%BA%94%E9%99%90%E6%B5%81)

@ -21,6 +21,7 @@
* [log-agent](log-agent.md)
* [database](database.md)
* [mysql](database-mysql.md)
* [mysql-orm](database-mysql-orm.md)
* [hbase](database-hbase.md)
* [tidb](database-tidb.md)
* [cache](cache.md)

@ -1,6 +1,10 @@
# 准备工作
推荐使用[kratos工具](kratos-tool.md)快速生成带`--grpc`的项目,如我们生成一个叫`kratos-demo`的项目。
推荐使用[kratos工具](kratos-tool.md)快速生成带`grpc`的项目,如我们生成一个叫`kratos-demo`的项目。
```
kratos new kratos-demo --proto
```
# pb文件

@ -4,7 +4,7 @@ import (
"fmt"
bm "github.com/bilibili/kratos/pkg/net/http/blademaster"
"github.com/bilibili/kratos/pkg/net/http/blademaster/middleware/auth"
"github.com/bilibili/kratos/example/blademaster/middleware/auth"
"github.com/bilibili/kratos/pkg/net/metadata"
)

@ -0,0 +1,40 @@
// Code generated by protoc-gen-bm v0.1, DO NOT EDIT.
// source: api.proto
package api
import (
"context"
bm "github.com/bilibili/kratos/pkg/net/http/blademaster"
"github.com/bilibili/kratos/pkg/net/http/blademaster/binding"
)
// to suppressed 'imported but not used warning'
var _ *bm.Context
var _ context.Context
var _ binding.StructValidator
var PathUserInfo = "/user.api.User/Info"
// UserBMServer is the server API for User service.
type UserBMServer interface {
Info(ctx context.Context, req *UserReq) (resp *InfoReply, err error)
}
var UserSvc UserBMServer
func userInfo(c *bm.Context) {
p := new(UserReq)
if err := c.BindWith(p, binding.Default(c.Request.Method, c.Request.Header.Get("Content-Type"))); err != nil {
return
}
resp, err := UserSvc.Info(c, p)
c.JSON(resp, err)
}
// RegisterUserBMServer Register the blademaster route
func RegisterUserBMServer(e *bm.Engine, server UserBMServer) {
UserSvc = server
e.GET("/user.api.User/Info", userInfo)
}

@ -0,0 +1,17 @@
// Code generated by protoc-gen-ecode v0.1, DO NOT EDIT.
// source: api.proto
package api
import (
"github.com/bilibili/kratos/pkg/ecode"
)
// to suppressed 'imported but not used warning'
var _ ecode.Codes
// UserErrCode ecode
var (
UserNotExist = ecode.New(-404)
UserUpdateNameFailed = ecode.New(10000)
)

File diff suppressed because it is too large Load Diff

@ -0,0 +1,33 @@
syntax = "proto3";
package user.api;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option go_package = "api";
enum UserErrCode {
OK = 0;
UserNotExist = -404;
UserUpdateNameFailed = 10000;
}
message Info {
int64 mid = 1 [(gogoproto.jsontag) = "mid"];
string name = 2 [(gogoproto.jsontag) = "name"];
string sex = 3 [(gogoproto.jsontag) = "sex"];
string face = 4 [(gogoproto.jsontag) = "face"];
string sign = 5 [(gogoproto.jsontag) = "sign"];
}
message UserReq {
int64 mid = 1 [(gogoproto.moretags) = "validate:\"gt=0,required\""];
}
message InfoReply {
Info info = 1;
}
service User {
rpc Info(UserReq) returns (InfoReply);
}

@ -0,0 +1,96 @@
{
"swagger": "2.0",
"info": {
"title": "api.proto",
"version": ""
},
"schemes": [
"http",
"https"
],
"consumes": [
"application/json",
"multipart/form-data"
],
"produces": [
"application/json"
],
"paths": {
"/user.api.User/Info": {
"get": {
"summary": "/user.api.User/Info",
"responses": {
"200": {
"description": "A successful response.",
"schema": {
"type": "object",
"properties": {
"code": {
"type": "integer"
},
"message": {
"type": "string"
},
"data": {
"$ref": "#/definitions/.user.api.InfoReply"
}
}
}
}
},
"parameters": [
{
"name": "mid",
"in": "query",
"required": true,
"type": "integer"
}
],
"tags": [
"user.api.User"
]
}
}
},
"definitions": {
".user.api.Info": {
"type": "object",
"properties": {
"mid": {
"type": "integer"
},
"name": {
"type": "string"
},
"sex": {
"type": "string"
},
"face": {
"type": "string"
},
"sign": {
"type": "string"
}
}
},
".user.api.InfoReply": {
"type": "object",
"properties": {
"info": {
"$ref": "#/definitions/.user.api.Info"
}
}
},
".user.api.UserReq": {
"type": "object",
"properties": {
"mid": {
"type": "integer"
}
},
"required": [
"mid"
]
}
}
}

@ -0,0 +1,3 @@
#!/bin/bash
kratos tool protoc api.proto

@ -11,7 +11,6 @@ import (
// Config mysql config.
type Config struct {
Addr string // for trace
DSN string // write data source name.
ReadDSN []string // read data source name.
Active int // pool

@ -38,7 +38,7 @@ var (
type DB struct {
write *conn
read []*conn
idx uint64
idx int64
master *DB
}
@ -47,6 +47,7 @@ type conn struct {
*sql.DB
breaker breaker.Breaker
conf *Config
addr string
}
// Tx transaction.
@ -130,17 +131,19 @@ func Open(c *Config) (*DB, error) {
if err != nil {
return nil, err
}
addr := parseDSNAddr(c.DSN)
brkGroup := breaker.NewGroup(c.Breaker)
brk := brkGroup.Get(c.Addr)
w := &conn{DB: d, breaker: brk, conf: c}
brk := brkGroup.Get(addr)
w := &conn{DB: d, breaker: brk, conf: c, addr: addr}
rs := make([]*conn, 0, len(c.ReadDSN))
for _, rd := range c.ReadDSN {
d, err := connect(c, rd)
if err != nil {
return nil, err
}
brk := brkGroup.Get(parseDSNAddr(rd))
r := &conn{DB: d, breaker: brk, conf: c}
addr = parseDSNAddr(rd)
brk := brkGroup.Get(addr)
r := &conn{DB: d, breaker: brk, conf: c, addr: addr}
rs = append(rs, r)
}
db.write = w
@ -193,7 +196,7 @@ func (db *DB) Prepared(query string) (stmt *Stmt) {
func (db *DB) Query(c context.Context, query string, args ...interface{}) (rows *Rows, err error) {
idx := db.readIndex()
for i := range db.read {
if rows, err = db.read[(idx+i)%len(db.read)].query(c, query, args...); !ecode.ServiceUnavailable.Equal(err) {
if rows, err = db.read[(idx+i)%len(db.read)].query(c, query, args...); !ecode.EqualError(ecode.ServiceUnavailable, err) {
return
}
}
@ -206,7 +209,7 @@ func (db *DB) Query(c context.Context, query string, args ...interface{}) (rows
func (db *DB) QueryRow(c context.Context, query string, args ...interface{}) *Row {
idx := db.readIndex()
for i := range db.read {
if row := db.read[(idx+i)%len(db.read)].queryRow(c, query, args...); !ecode.ServiceUnavailable.Equal(row.err) {
if row := db.read[(idx+i)%len(db.read)].queryRow(c, query, args...); !ecode.EqualError(ecode.ServiceUnavailable, row.err) {
return row
}
}
@ -217,8 +220,8 @@ func (db *DB) readIndex() int {
if len(db.read) == 0 {
return 0
}
v := atomic.AddUint64(&db.idx, 1)
return int(v % uint64(len(db.read)))
v := atomic.AddInt64(&db.idx, 1)
return int(v) % len(db.read)
}
// Close closes the write and read database, releasing any open resources.
@ -271,7 +274,7 @@ func (db *conn) begin(c context.Context) (tx *Tx, err error) {
t, ok := trace.FromContext(c)
if ok {
t = t.Fork(_family, "begin")
t.SetTag(trace.String(trace.TagAddress, db.conf.Addr), trace.String(trace.TagComment, ""))
t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, ""))
defer func() {
if err != nil {
t.Finish(&err)
@ -279,12 +282,12 @@ func (db *conn) begin(c context.Context) (tx *Tx, err error) {
}()
}
if err = db.breaker.Allow(); err != nil {
_metricReqErr.Inc(db.conf.Addr, db.conf.Addr, "begin", "breaker")
_metricReqErr.Inc(db.addr, db.addr, "begin", "breaker")
return
}
_, c, cancel := db.conf.TranTimeout.Shrink(c)
rtx, err := db.BeginTx(c, nil)
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.conf.Addr, db.conf.Addr, "begin")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.addr, db.addr, "begin")
if err != nil {
err = errors.WithStack(err)
cancel()
@ -299,18 +302,18 @@ func (db *conn) exec(c context.Context, query string, args ...interface{}) (res
defer slowLog(fmt.Sprintf("Exec query(%s) args(%+v)", query, args), now)
if t, ok := trace.FromContext(c); ok {
t = t.Fork(_family, "exec")
t.SetTag(trace.String(trace.TagAddress, db.conf.Addr), trace.String(trace.TagComment, query))
t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, query))
defer t.Finish(&err)
}
if err = db.breaker.Allow(); err != nil {
_metricReqErr.Inc(db.conf.Addr, db.conf.Addr, "exec", "breaker")
_metricReqErr.Inc(db.addr, db.addr, "exec", "breaker")
return
}
_, c, cancel := db.conf.ExecTimeout.Shrink(c)
res, err = db.ExecContext(c, query, args...)
cancel()
db.onBreaker(&err)
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.conf.Addr, db.conf.Addr, "exec")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.addr, db.addr, "exec")
if err != nil {
err = errors.Wrapf(err, "exec:%s, args:%+v", query, args)
}
@ -322,18 +325,18 @@ func (db *conn) ping(c context.Context) (err error) {
defer slowLog("Ping", now)
if t, ok := trace.FromContext(c); ok {
t = t.Fork(_family, "ping")
t.SetTag(trace.String(trace.TagAddress, db.conf.Addr), trace.String(trace.TagComment, ""))
t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, ""))
defer t.Finish(&err)
}
if err = db.breaker.Allow(); err != nil {
_metricReqErr.Inc(db.conf.Addr, db.conf.Addr, "ping", "breaker")
_metricReqErr.Inc(db.addr, db.addr, "ping", "breaker")
return
}
_, c, cancel := db.conf.ExecTimeout.Shrink(c)
err = db.PingContext(c)
cancel()
db.onBreaker(&err)
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.conf.Addr, db.conf.Addr, "ping")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.addr, db.addr, "ping")
if err != nil {
err = errors.WithStack(err)
}
@ -379,17 +382,17 @@ func (db *conn) query(c context.Context, query string, args ...interface{}) (row
defer slowLog(fmt.Sprintf("Query query(%s) args(%+v)", query, args), now)
if t, ok := trace.FromContext(c); ok {
t = t.Fork(_family, "query")
t.SetTag(trace.String(trace.TagAddress, db.conf.Addr), trace.String(trace.TagComment, query))
t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, query))
defer t.Finish(&err)
}
if err = db.breaker.Allow(); err != nil {
_metricReqErr.Inc(db.conf.Addr, db.conf.Addr, "query", "breaker")
_metricReqErr.Inc(db.addr, db.addr, "query", "breaker")
return
}
_, c, cancel := db.conf.QueryTimeout.Shrink(c)
rs, err := db.DB.QueryContext(c, query, args...)
db.onBreaker(&err)
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.conf.Addr, db.conf.Addr, "query")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.addr, db.addr, "query")
if err != nil {
err = errors.Wrapf(err, "query:%s, args:%+v", query, args)
cancel()
@ -405,15 +408,15 @@ func (db *conn) queryRow(c context.Context, query string, args ...interface{}) *
t, ok := trace.FromContext(c)
if ok {
t = t.Fork(_family, "queryrow")
t.SetTag(trace.String(trace.TagAddress, db.conf.Addr), trace.String(trace.TagComment, query))
t.SetTag(trace.String(trace.TagAddress, db.addr), trace.String(trace.TagComment, query))
}
if err := db.breaker.Allow(); err != nil {
_metricReqErr.Inc(db.conf.Addr, db.conf.Addr, "queryRow", "breaker")
_metricReqErr.Inc(db.addr, db.addr, "queryRow", "breaker")
return &Row{db: db, t: t, err: err}
}
_, c, cancel := db.conf.QueryTimeout.Shrink(c)
r := db.DB.QueryRowContext(c, query, args...)
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.conf.Addr, db.conf.Addr, "queryrow")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), db.addr, db.addr, "queryrow")
return &Row{db: db, Row: r, query: query, args: args, t: t, cancel: cancel}
}
@ -445,11 +448,11 @@ func (s *Stmt) Exec(c context.Context, args ...interface{}) (res sql.Result, err
}
} else if t, ok := trace.FromContext(c); ok {
t = t.Fork(_family, "exec")
t.SetTag(trace.String(trace.TagAddress, s.db.conf.Addr), trace.String(trace.TagComment, s.query))
t.SetTag(trace.String(trace.TagAddress, s.db.addr), trace.String(trace.TagComment, s.query))
defer t.Finish(&err)
}
if err = s.db.breaker.Allow(); err != nil {
_metricReqErr.Inc(s.db.conf.Addr, s.db.conf.Addr, "stmt:exec", "breaker")
_metricReqErr.Inc(s.db.addr, s.db.addr, "stmt:exec", "breaker")
return
}
stmt, ok := s.stmt.Load().(*sql.Stmt)
@ -461,7 +464,7 @@ func (s *Stmt) Exec(c context.Context, args ...interface{}) (res sql.Result, err
res, err = stmt.ExecContext(c, args...)
cancel()
s.db.onBreaker(&err)
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), s.db.conf.Addr, s.db.conf.Addr, "stmt:exec")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), s.db.addr, s.db.addr, "stmt:exec")
if err != nil {
err = errors.Wrapf(err, "exec:%s, args:%+v", s.query, args)
}
@ -483,11 +486,11 @@ func (s *Stmt) Query(c context.Context, args ...interface{}) (rows *Rows, err er
}
} else if t, ok := trace.FromContext(c); ok {
t = t.Fork(_family, "query")
t.SetTag(trace.String(trace.TagAddress, s.db.conf.Addr), trace.String(trace.TagComment, s.query))
t.SetTag(trace.String(trace.TagAddress, s.db.addr), trace.String(trace.TagComment, s.query))
defer t.Finish(&err)
}
if err = s.db.breaker.Allow(); err != nil {
_metricReqErr.Inc(s.db.conf.Addr, s.db.conf.Addr, "stmt:query", "breaker")
_metricReqErr.Inc(s.db.addr, s.db.addr, "stmt:query", "breaker")
return
}
stmt, ok := s.stmt.Load().(*sql.Stmt)
@ -498,7 +501,7 @@ func (s *Stmt) Query(c context.Context, args ...interface{}) (rows *Rows, err er
_, c, cancel := s.db.conf.QueryTimeout.Shrink(c)
rs, err := stmt.QueryContext(c, args...)
s.db.onBreaker(&err)
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), s.db.conf.Addr, s.db.conf.Addr, "stmt:query")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), s.db.addr, s.db.addr, "stmt:query")
if err != nil {
err = errors.Wrapf(err, "query:%s, args:%+v", s.query, args)
cancel()
@ -527,11 +530,11 @@ func (s *Stmt) QueryRow(c context.Context, args ...interface{}) (row *Row) {
}
} else if t, ok := trace.FromContext(c); ok {
t = t.Fork(_family, "queryrow")
t.SetTag(trace.String(trace.TagAddress, s.db.conf.Addr), trace.String(trace.TagComment, s.query))
t.SetTag(trace.String(trace.TagAddress, s.db.addr), trace.String(trace.TagComment, s.query))
row.t = t
}
if row.err = s.db.breaker.Allow(); row.err != nil {
_metricReqErr.Inc(s.db.conf.Addr, s.db.conf.Addr, "stmt:queryrow", "breaker")
_metricReqErr.Inc(s.db.addr, s.db.addr, "stmt:queryrow", "breaker")
return
}
stmt, ok := s.stmt.Load().(*sql.Stmt)
@ -541,7 +544,7 @@ func (s *Stmt) QueryRow(c context.Context, args ...interface{}) (row *Row) {
_, c, cancel := s.db.conf.QueryTimeout.Shrink(c)
row.Row = stmt.QueryRowContext(c, args...)
row.cancel = cancel
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), s.db.conf.Addr, s.db.conf.Addr, "stmt:queryrow")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), s.db.addr, s.db.addr, "stmt:queryrow")
return
}
@ -582,7 +585,7 @@ func (tx *Tx) Exec(query string, args ...interface{}) (res sql.Result, err error
tx.t.SetTag(trace.String(trace.TagAnnotation, fmt.Sprintf("exec %s", query)))
}
res, err = tx.tx.ExecContext(tx.c, query, args...)
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), tx.db.conf.Addr, tx.db.conf.Addr, "tx:exec")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), tx.db.addr, tx.db.addr, "tx:exec")
if err != nil {
err = errors.Wrapf(err, "exec:%s, args:%+v", query, args)
}
@ -597,7 +600,7 @@ func (tx *Tx) Query(query string, args ...interface{}) (rows *Rows, err error) {
now := time.Now()
defer slowLog(fmt.Sprintf("Query query(%s) args(%+v)", query, args), now)
defer func() {
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), tx.db.conf.Addr, tx.db.conf.Addr, "tx:query")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), tx.db.addr, tx.db.addr, "tx:query")
}()
rs, err := tx.tx.QueryContext(tx.c, query, args...)
if err == nil {
@ -618,7 +621,7 @@ func (tx *Tx) QueryRow(query string, args ...interface{}) *Row {
now := time.Now()
defer slowLog(fmt.Sprintf("QueryRow query(%s) args(%+v)", query, args), now)
defer func() {
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), tx.db.conf.Addr, tx.db.conf.Addr, "tx:queryrow")
_metricReqDur.Observe(int64(time.Since(now)/time.Millisecond), tx.db.addr, tx.db.addr, "tx:queryrow")
}()
r := tx.tx.QueryRowContext(tx.c, query, args...)
return &Row{Row: r, db: tx.db, query: query, args: args}

@ -9,7 +9,7 @@ import (
)
var (
_messages atomic.Value // NOTE: stored map[string]map[int]string
_messages atomic.Value // NOTE: stored map[int]string
_codes = map[int]struct{}{} // register codes.
)
@ -71,10 +71,6 @@ func (e Code) Message() string {
// Details return details.
func (e Code) Details() []interface{} { return nil }
// Equal for compatible.
// Deprecated: please use ecode.EqualError.
func (e Code) Equal(err error) bool { return EqualError(e, err) }
// Int parse code int to error.
func Int(i int) Code { return Code(i) }

@ -74,12 +74,6 @@ func (s *Status) WithDetails(pbs ...proto.Message) (*Status, error) {
return s, nil
}
// Equal for compatible.
// Deprecated: please use ecode.EqualError.
func (s *Status) Equal(err error) bool {
return EqualError(s, err)
}
// Proto return origin protobuf message
func (s *Status) Proto() *types.Status {
return s.s

@ -16,9 +16,6 @@ func TestEqual(t *testing.T) {
err2 = Errorf(RequestErr, "test")
)
assert.Equal(t, err1, err2)
assert.True(t, OK.Equal(nil))
assert.True(t, err1.Equal(err2))
assert.False(t, err1.Equal(nil))
assert.True(t, Equal(nil, nil))
}

@ -338,7 +338,7 @@ func (d *Discovery) Register(ctx context.Context, ins *naming.Instance) (cancelF
for {
select {
case <-ticker.C:
if err := d.renew(ctx, ins); err != nil && ecode.NothingFound.Equal(err) {
if err := d.renew(ctx, ins); err != nil && ecode.EqualError(ecode.NothingFound, err) {
_ = d.register(ctx, ins)
}
case <-ctx.Done():
@ -380,7 +380,7 @@ func (d *Discovery) register(ctx context.Context, ins *naming.Instance) (err err
uri, c.Zone, c.Env, ins.AppID, ins.Addrs, err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
if ec := ecode.Int(res.Code); !ecode.Equal(ecode.OK, ec) {
log.Warn("discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)", uri, c.Env, ins.AppID, ins.Addrs, res.Code)
err = ec
return
@ -408,9 +408,9 @@ func (d *Discovery) renew(ctx context.Context, ins *naming.Instance) (err error)
uri, c.Env, ins.AppID, c.Host, err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
if ec := ecode.Int(res.Code); !ecode.Equal(ecode.OK, ec) {
err = ec
if ec.Equal(ecode.NothingFound) {
if ecode.Equal(ecode.NothingFound, ec) {
return
}
log.Error("discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
@ -440,7 +440,7 @@ func (d *Discovery) cancel(ins *naming.Instance) (err error) {
uri, c.Env, ins.AppID, c.Host, err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
if ec := ecode.Int(res.Code); !ecode.Equal(ecode.OK, ec) {
log.Warn("discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
uri, c.Env, ins.AppID, c.Host, res.Code)
err = ec
@ -484,7 +484,7 @@ func (d *Discovery) set(ctx context.Context, ins *naming.Instance) (err error) {
uri, conf.Zone, conf.Env, ins.AppID, ins.Addrs, err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
if ec := ecode.Int(res.Code); !ecode.Equal(ecode.OK, ec) {
log.Warn("discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)",
uri, conf.Env, ins.AppID, ins.Addrs, res.Code)
err = ec
@ -587,8 +587,8 @@ func (d *Discovery) polls(ctx context.Context) (apps map[string]*naming.Instance
log.Error("discovery: client.Get(%s) error(%+v)", uri+"?"+params.Encode(), err)
return
}
if ec := ecode.Int(res.Code); !ec.Equal(ecode.OK) {
if !ec.Equal(ecode.NotModified) {
if ec := ecode.Int(res.Code); !ecode.Equal(ecode.OK, ec) {
if !ecode.Equal(ecode.NotModified, ec) {
log.Error("discovery: client.Get(%s) get error code(%d)", uri+"?"+params.Encode(), res.Code)
err = ec
}
@ -611,7 +611,7 @@ func (d *Discovery) broadcast(apps map[string]*naming.InstancesInfo) {
for appID, v := range apps {
var count int
// v maybe nil in old version(less than v1.1) discovery,check incase of panic
if v==nil {
if v == nil {
continue
}
for zone, ins := range v.Instances {

@ -134,7 +134,6 @@ func (group *RouterGroup) HEAD(relativePath string, handlers ...HandlerFunc) IRo
return group.handle("HEAD", relativePath, handlers...)
}
func (group *RouterGroup) combineHandlers(handlerGroups ...[]HandlerFunc) []HandlerFunc {
finalSize := len(group.Handlers)
for _, handlers := range handlerGroups {
@ -190,4 +189,3 @@ func (group *RouterGroup) Any(relativePath string, handlers ...HandlerFunc) IRou
group.handle("TRACE", relativePath, handlers...)
return group.returnObj()
}

@ -206,7 +206,7 @@ func (engine *Engine) SetMethodConfig(path string, mc *MethodConfig) {
engine.pcLock.Unlock()
}
// DefaultServer returns an Engine instance with the Recovery, Logger and CSRF middleware already attached.
// DefaultServer returns an Engine instance with the Recovery and Logger middleware already attached.
func DefaultServer(conf *ServerConfig) *Engine {
engine := NewServer(conf)
engine.Use(Recovery(), Trace(), Logger())
@ -383,6 +383,8 @@ func (engine *Engine) UseFunc(middleware ...HandlerFunc) IRoutes {
// For example, this is the right place for a logger or error management middleware.
func (engine *Engine) Use(middleware ...Handler) IRoutes {
engine.RouterGroup.Use(middleware...)
engine.rebuild404Handlers()
engine.rebuild405Handlers()
return engine
}

@ -1,5 +1,8 @@
### net/rpc/warden
##### Version 1.1.21
1. fix resolver bug
##### Version 1.1.20
1. client增加timeoutCallOpt强制覆盖每次请求的timeout

@ -3,8 +3,6 @@ package warden
import (
"context"
"fmt"
"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver"
"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver/direct"
"net/url"
"os"
"strconv"
@ -12,6 +10,9 @@ import (
"sync"
"time"
"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver"
"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver/direct"
"github.com/bilibili/kratos/pkg/conf/env"
"github.com/bilibili/kratos/pkg/conf/flagvar"
"github.com/bilibili/kratos/pkg/ecode"
@ -84,14 +85,15 @@ type Client struct {
handlers []grpc.UnaryClientInterceptor
}
type TimeOutCallOption struct {
// TimeoutCallOption timeout option.
type TimeoutCallOption struct {
*grpc.EmptyCallOption
Timeout time.Duration
}
// WithTimeoutCallOption can override the timeout in ctx and the timeout in the configuration file
func WithTimeoutCallOption(timeout time.Duration) *TimeOutCallOption {
return &TimeOutCallOption{&grpc.EmptyCallOption{}, timeout}
func WithTimeoutCallOption(timeout time.Duration) *TimeoutCallOption {
return &TimeoutCallOption{&grpc.EmptyCallOption{}, timeout}
}
// handle returns a new unary client interceptor for OpenTracing\Logging\LinkTimeout.
@ -127,10 +129,10 @@ func (c *Client) handle() grpc.UnaryClientInterceptor {
return
}
defer onBreaker(brk, &err)
var timeOpt *TimeOutCallOption
var timeOpt *TimeoutCallOption
for _, opt := range opts {
var tok bool
timeOpt, tok = opt.(*TimeOutCallOption)
timeOpt, tok = opt.(*TimeoutCallOption)
if tok {
break
}
@ -173,9 +175,10 @@ func (c *Client) handle() grpc.UnaryClientInterceptor {
func onBreaker(breaker breaker.Breaker, err *error) {
if err != nil && *err != nil {
if ecode.ServerErr.Equal(*err) || ecode.ServiceUnavailable.Equal(*err) || ecode.Deadline.Equal(*err) || ecode.LimitExceed.Equal(*err) {
if ecode.EqualError(ecode.ServerErr, *err) || ecode.EqualError(ecode.ServiceUnavailable, *err) || ecode.EqualError(ecode.Deadline, *err) || ecode.EqualError(ecode.LimitExceed, *err) {
breaker.MarkFailed()
return
}
}
breaker.MarkSuccess()

@ -129,18 +129,23 @@ func (r *Resolver) updateproc() {
}
}
if ins, ok := r.nr.Fetch(context.Background()); ok {
instances, ok := ins.Instances[r.zone]
if !ok {
instances, _ := ins.Instances[r.zone]
res := r.filter(instances)
if len(res) == 0 {
for _, value := range ins.Instances {
instances = append(instances, value...)
}
res = r.filter(instances)
}
r.newAddress(r.filter(instances))
r.newAddress(res)
}
}
}
func (r *Resolver) filter(backends []*naming.Instance) (instances []*naming.Instance) {
if len(backends) == 0 {
return
}
for _, ins := range backends {
//如果r.clusters的长度大于0说明需要进行集群选择
if _, ok := r.clusters[ins.Metadata[naming.MetaCluster]]; !ok && len(r.clusters) > 0 {

@ -220,7 +220,7 @@ func Test_Warden(t *testing.T) {
func testValidation(t *testing.T) {
_, err := runClient(context.Background(), &clientConfig, t, "", 0)
if !ecode.RequestErr.Equal(err) {
if !ecode.EqualError(ecode.RequestErr, err) {
t.Fatalf("testValidation should return ecode.RequestErr,but is %v", err)
}
}
@ -296,7 +296,7 @@ func testBreaker(t *testing.T) {
for i := 0; i < 50; i++ {
_, err := c.SayHello(context.Background(), &pb.HelloRequest{Name: "breaker_test"})
if err != nil {
if ecode.ServiceUnavailable.Equal(err) {
if ecode.EqualError(ecode.ServiceUnavailable, err) {
return
}
}
@ -334,7 +334,7 @@ func testLinkTimeout(t *testing.T) {
if err == nil {
t.Fatalf("testLinkTimeout must return error")
}
if !ecode.Deadline.Equal(err) {
if !ecode.EqualError(ecode.Deadline, err) {
t.Fatalf("testLinkTimeout must return error RPCDeadline,err:%v", err)
}
@ -344,7 +344,7 @@ func testClientConfig(t *testing.T) {
if err == nil {
t.Fatalf("testLinkTimeout must return error")
}
if !ecode.Deadline.Equal(err) {
if !ecode.EqualError(ecode.Deadline, err) {
t.Fatalf("testLinkTimeout must return error RPCDeadline,err:%v", err)
}
}
@ -397,7 +397,7 @@ func testClientRecovery(t *testing.T) {
t.Fatalf("recovery must return ecode error")
}
if !ecode.ServerErr.Equal(e) {
if !ecode.EqualError(ecode.ServerErr, e) {
t.Fatalf("recovery must return ecode.RPCClientErr")
}
}

@ -1,6 +1,9 @@
package metric
import "errors"
import (
"errors"
"fmt"
)
// Opts contains the common arguments for creating Metric.
type Opts struct {
@ -62,6 +65,7 @@ func NewBusinessMetricCount(name string, labels ...string) CounterVec {
Subsystem: _businessSubsystemCount,
Name: name,
Labels: labels,
Help: fmt.Sprintf("business metric count %s", name),
})
}
@ -76,6 +80,7 @@ func NewBusinessMetricGauge(name string, labels ...string) GaugeVec {
Subsystem: _businessSubSystemGauge,
Name: name,
Labels: labels,
Help: fmt.Sprintf("business metric gauge %s", name),
})
}
@ -94,5 +99,6 @@ func NewBusinessMetricHistogram(name string, buckets []float64, labels ...string
Name: name,
Labels: labels,
Buckets: buckets,
Help: fmt.Sprintf("business metric histogram %s", name),
})
}

@ -56,6 +56,7 @@ func newCgroupCPU() (cpu *cgroupCPU, err error) {
preTotal, err := totalCPUUsage()
if err != nil {
err = errors.Errorf("totalCPUUsage() failed!err:=%v", err)
return
}
cpu = &cgroupCPU{
frequency: maxFreq,

@ -8,7 +8,7 @@ import (
const (
_getBMGen = "go get -u github.com/bilibili/kratos/tool/protobuf/protoc-gen-bm"
_bmProtoc = "protoc --proto_path=%s --proto_path=%s --proto_path=%s --bm_out=explicit_http=true:."
_bmProtoc = "protoc --proto_path=%s --proto_path=%s --proto_path=%s --bm_out=:."
)
func installBMGen() error {

@ -0,0 +1,25 @@
package main
import (
"os/exec"
"github.com/urfave/cli"
)
const (
_getEcodeGen = "go get -u github.com/bilibili/kratos/tool/protobuf/protoc-gen-ecode"
_ecodeProtoc = "protoc --proto_path=%s --proto_path=%s --proto_path=%s --ecode_out=:."
)
func installEcodeGen() error {
if _, err := exec.LookPath("protoc-gen-ecode"); err != nil {
if err := goget(_getEcodeGen); err != nil {
return err
}
}
return nil
}
func genEcode(ctx *cli.Context) error {
return generate(ctx, _ecodeProtoc)
}

@ -27,6 +27,11 @@ func main() {
Usage: "whether to use swagger for generation",
Destination: &withSwagger,
},
cli.BoolFlag{
Name: "ecode",
Usage: "whether to use ecode for generation",
Destination: &withEcode,
},
}
app.Action = func(c *cli.Context) error {
return protocAction(c)

@ -20,16 +20,18 @@ var (
withBM bool
withGRPC bool
withSwagger bool
withEcode bool
)
func protocAction(ctx *cli.Context) (err error) {
if err = checkProtoc(); err != nil {
return err
}
if !withGRPC && !withBM && !withSwagger {
if !withGRPC && !withBM && !withSwagger && !withEcode {
withBM = true
withGRPC = true
withSwagger = true
withEcode = true
}
if withBM {
if err = installBMGen(); err != nil {
@ -55,6 +57,14 @@ func protocAction(ctx *cli.Context) (err error) {
return
}
}
if withEcode {
if err = installEcodeGen(); err != nil {
return
}
if err = genEcode(ctx); err != nil {
return
}
}
log.Printf("generate %v success.\n", ctx.Args())
return nil
}

@ -8,7 +8,7 @@ import (
const (
_getSwaggerGen = "go get -u github.com/bilibili/kratos/tool/protobuf/protoc-gen-bswagger"
_swaggerProtoc = "protoc --proto_path=%s --proto_path=%s --proto_path=%s --bswagger_out=explicit_http=true:."
_swaggerProtoc = "protoc --proto_path=%s --proto_path=%s --proto_path=%s --bswagger_out=:."
)
func installSwaggerGen() error {

@ -546,7 +546,7 @@ type Kratos struct {
go 1.12
require (
github.com/bilibili/kratos v0.2.1
github.com/bilibili/kratos v0.2.2
github.com/gogo/protobuf v1.2.1
github.com/golang/protobuf v1.3.2
golang.org/x/net v0.0.0-20190628185345-da137c7871d7

@ -8,7 +8,7 @@ import (
var (
// Version is version
Version = "0.2.1"
Version = "0.2.2"
// BuildTime is BuildTime
BuildTime = "2019/07/24"
)

@ -2,6 +2,7 @@ package naming
import (
"os"
"path"
"path/filepath"
"strings"
@ -25,6 +26,16 @@ func GetVersionPrefix(pkg string) string {
return ""
}
// GenFileName returns the output name for the generated Go file.
func GenFileName(f *descriptor.FileDescriptorProto, suffix string) string {
name := *f.Name
if ext := path.Ext(name); ext == ".pb" || ext == ".proto" || ext == ".protodevel" {
name = name[:len(name)-len(ext)]
}
name += suffix
return name
}
func ServiceName(service *descriptor.ServiceDescriptorProto) string {
return utils.CamelCase(service.GetName())
}

@ -44,9 +44,6 @@ func (t *bm) Generate(in *plugin.CodeGeneratorRequest) *plugin.CodeGeneratorResp
func (t *bm) generateForFile(file *descriptor.FileDescriptorProto) *plugin.CodeGeneratorResponse_File {
resp := new(plugin.CodeGeneratorResponse_File)
//if len(file.Service) == 0 {
// return nil
//}
t.generateFileHeader(file, t.GenPkgName)
t.generateImports(file)
@ -56,11 +53,8 @@ func (t *bm) generateForFile(file *descriptor.FileDescriptorProto) *plugin.CodeG
count += t.generateBMInterface(file, service)
t.generateBMRoute(file, service, i)
}
//if count == 0 {
// return nil
//}
resp.Name = proto.String(naming.GoFileName(file, ".bm.go"))
resp.Name = proto.String(naming.GenFileName(file, ".bm.go"))
resp.Content = proto.String(t.FormattedOutput())
t.Output.Reset()
@ -88,13 +82,13 @@ func (t *bm) generateFileHeader(file *descriptor.FileDescriptorProto, pkgName st
t.P("// source: ", file.GetName())
t.P()
if t.filesHandled == 0 {
// doc for the first file
t.P("/*")
t.P("Package ", t.GenPkgName, " is a generated blademaster stub package.")
t.P("This code was generated with kratos/tool/protobuf/protoc-gen-bm ", generator.Version, ".")
t.P()
comment, err := t.Reg.FileComments(file)
if err == nil && comment.Leading != "" {
// doc for the first file
t.P("/*")
t.P("Package ", t.GenPkgName, " is a generated blademaster stub package.")
t.P("This code was generated with kratos/tool/protobuf/protoc-gen-bm ", generator.Version, ".")
t.P()
for _, line := range strings.Split(comment.Leading, "\n") {
line = strings.TrimPrefix(line, " ")
// ensure we don't escape from the block comment
@ -102,12 +96,12 @@ func (t *bm) generateFileHeader(file *descriptor.FileDescriptorProto, pkgName st
t.P(line)
}
t.P()
t.P("It is generated from these files:")
for _, f := range t.GenFiles {
t.P("\t", f.GetName())
}
t.P("*/")
}
t.P("It is generated from these files:")
for _, f := range t.GenFiles {
t.P("\t", f.GetName())
}
t.P("*/")
}
t.P(`package `, pkgName)
t.P()

@ -66,7 +66,7 @@ func (t *swaggerGen) generateSwagger(file *descriptor.FileDescriptorProto) *plug
t.defsMap = map[string]*typemap.MessageDefinition{}
out := &plugin.CodeGeneratorResponse_File{}
name := naming.GoFileName(file, ".swagger.json")
name := naming.GenFileName(file, ".swagger.json")
for _, svc := range file.Service {
for _, meth := range svc.Method {
if !t.ShouldGenForMethod(file, svc, meth) {

@ -0,0 +1,117 @@
package generator
import (
"strconv"
"strings"
"github.com/bilibili/kratos/tool/protobuf/pkg/generator"
"github.com/bilibili/kratos/tool/protobuf/pkg/naming"
"github.com/golang/protobuf/proto"
"github.com/golang/protobuf/protoc-gen-go/descriptor"
plugin "github.com/golang/protobuf/protoc-gen-go/plugin"
)
type ecode struct {
generator.Base
filesHandled int
}
// EcodeGenerator ecode generator.
func EcodeGenerator() *ecode {
t := &ecode{}
return t
}
// Generate ...
func (t *ecode) Generate(in *plugin.CodeGeneratorRequest) *plugin.CodeGeneratorResponse {
t.Setup(in)
// Showtime! Generate the response.
resp := new(plugin.CodeGeneratorResponse)
for _, f := range t.GenFiles {
respFile := t.generateForFile(f)
if respFile != nil {
resp.File = append(resp.File, respFile)
}
}
return resp
}
func (t *ecode) generateForFile(file *descriptor.FileDescriptorProto) *plugin.CodeGeneratorResponse_File {
var enums []*descriptor.EnumDescriptorProto
for _, enum := range file.EnumType {
if strings.HasSuffix(*enum.Name, "ErrCode") {
enums = append(enums, enum)
}
}
if len(enums) == 0 {
return nil
}
resp := new(plugin.CodeGeneratorResponse_File)
t.generateFileHeader(file, t.GenPkgName)
t.generateImports(file)
for _, enum := range enums {
t.generateEcode(file, enum)
}
resp.Name = proto.String(naming.GenFileName(file, ".ecode.go"))
resp.Content = proto.String(t.FormattedOutput())
t.Output.Reset()
t.filesHandled++
return resp
}
func (t *ecode) generateFileHeader(file *descriptor.FileDescriptorProto, pkgName string) {
t.P("// Code generated by protoc-gen-ecode ", generator.Version, ", DO NOT EDIT.")
t.P("// source: ", file.GetName())
t.P()
if t.filesHandled == 0 {
comment, err := t.Reg.FileComments(file)
if err == nil && comment.Leading != "" {
// doc for the first file
t.P("/*")
t.P("Package ", t.GenPkgName, " is a generated ecode package.")
t.P("This code was generated with kratos/tool/protobuf/protoc-gen-ecode ", generator.Version, ".")
t.P()
for _, line := range strings.Split(comment.Leading, "\n") {
line = strings.TrimPrefix(line, " ")
// ensure we don't escape from the block comment
line = strings.Replace(line, "*/", "* /", -1)
t.P(line)
}
t.P()
t.P("It is generated from these files:")
for _, f := range t.GenFiles {
t.P("\t", f.GetName())
}
t.P("*/")
}
}
t.P(`package `, pkgName)
t.P()
}
func (t *ecode) generateImports(file *descriptor.FileDescriptorProto) {
t.P(`import (`)
t.P(` "github.com/bilibili/kratos/pkg/ecode"`)
t.P(`)`)
t.P()
t.P(`// to suppressed 'imported but not used warning'`)
t.P(`var _ ecode.Codes`)
}
func (t *ecode) generateEcode(file *descriptor.FileDescriptorProto, enum *descriptor.EnumDescriptorProto) {
t.P("// ", *enum.Name, " ecode")
t.P("var (")
for _, item := range enum.Value {
if *item.Number == 0 {
continue
}
// NOTE: eg: t.P("UserNotExist = New(-404) ")
t.P(*item.Name, " = ", "ecode.New(", strconv.Itoa(int(*item.Number)), ")")
}
t.P(")")
}

@ -0,0 +1,27 @@
package generator
import (
"os"
"os/exec"
"testing"
"github.com/golang/protobuf/proto"
plugin "github.com/golang/protobuf/protoc-gen-go/plugin"
)
func TestGenerateParseCommandLineParamsError(t *testing.T) {
if os.Getenv("BE_CRASHER") == "1" {
g := &ecode{}
g.Generate(&plugin.CodeGeneratorRequest{
Parameter: proto.String("invalid"),
})
return
}
cmd := exec.Command(os.Args[0], "-test.run=TestGenerateParseCommandLineParamsError")
cmd.Env = append(os.Environ(), "BE_CRASHER=1")
err := cmd.Run()
if e, ok := err.(*exec.ExitError); ok && !e.Success() {
return
}
t.Fatalf("process ran with err %v, want exit status 1", err)
}

@ -0,0 +1,23 @@
package main
import (
"flag"
"fmt"
"os"
"github.com/bilibili/kratos/tool/protobuf/pkg/gen"
"github.com/bilibili/kratos/tool/protobuf/pkg/generator"
ecodegen "github.com/bilibili/kratos/tool/protobuf/protoc-gen-ecode/generator"
)
func main() {
versionFlag := flag.Bool("version", false, "print version and exit")
flag.Parse()
if *versionFlag {
fmt.Println(generator.Version)
os.Exit(0)
}
g := ecodegen.EcodeGenerator()
gen.Main(g)
}
Loading…
Cancel
Save