update warden from internal library

pull/161/head
wei cheng 6 years ago
parent 19200b067d
commit 64495e5362
No known key found for this signature in database
GPG Key ID: 5AF7E34E6A971D7F
  1. 14
      pkg/net/rpc/warden/CHANGELOG.md
  2. 12
      pkg/net/rpc/warden/README.md
  3. 79
      pkg/net/rpc/warden/client.go
  4. 34
      pkg/net/rpc/warden/client_test.go
  5. 2
      pkg/net/rpc/warden/internal/pb/ecode.proto
  6. 94
      pkg/net/rpc/warden/logging.go
  7. 239
      pkg/net/rpc/warden/logging_test.go
  8. 12
      pkg/net/rpc/warden/server.go

@ -1,9 +1,21 @@
### net/rpc/warden ### net/rpc/warden
##### Version 1.1.17
1. 移除 bbr feature flag,默认开启自适应限流
##### Version 1.1.16
1. 使用 flag(grpc.bbr) 绑定 BBR 限流
##### Version 1.1.15
1. warden使用 metadata.Range 方法
##### Version 1.1.14 ##### Version 1.1.14
1. p2c balancer增加filter保护 1. 为 server log 添加选项
##### Version 1.1.13 ##### Version 1.1.13
1. 为 client log 添加选项
##### Version 1.1.12
1. 设置 caller 为 no_user 如果 user 不存在 1. 设置 caller 为 no_user 如果 user 不存在
##### Version 1.1.12 ##### Version 1.1.12

@ -1,5 +1,13 @@
#### net/rpc/warden #### net/rcp/warden
##### 项目简介 ##### 项目简介
gRPC 框架,带来如飞一般的体验。 来自 bilibili 主站技术部的 RPC 框架,融合主站技术部的核心科技,带来如飞一般的体验。
##### 编译环境
- **请只用 Golang v1.9.x 以上版本编译执行**
##### 依赖包
- [grpc](google.golang.org/grpc)

@ -14,12 +14,12 @@ import (
"github.com/bilibili/kratos/pkg/conf/flagvar" "github.com/bilibili/kratos/pkg/conf/flagvar"
"github.com/bilibili/kratos/pkg/ecode" "github.com/bilibili/kratos/pkg/ecode"
"github.com/bilibili/kratos/pkg/naming" "github.com/bilibili/kratos/pkg/naming"
"github.com/bilibili/kratos/pkg/naming/discovery"
nmd "github.com/bilibili/kratos/pkg/net/metadata" nmd "github.com/bilibili/kratos/pkg/net/metadata"
"github.com/bilibili/kratos/pkg/net/netutil/breaker" "github.com/bilibili/kratos/pkg/net/netutil/breaker"
"github.com/bilibili/kratos/pkg/net/rpc/warden/balancer/p2c" "github.com/bilibili/kratos/pkg/net/rpc/warden/balancer/p2c"
"github.com/bilibili/kratos/pkg/net/rpc/warden/internal/status" "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/status"
"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver" "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver"
"github.com/bilibili/kratos/pkg/net/rpc/warden/resolver/direct"
"github.com/bilibili/kratos/pkg/net/trace" "github.com/bilibili/kratos/pkg/net/trace"
xtime "github.com/bilibili/kratos/pkg/time" xtime "github.com/bilibili/kratos/pkg/time"
@ -51,10 +51,6 @@ func baseMetadata() metadata.MD {
return gmd return gmd
} }
func init() {
resolver.Register(direct.New())
}
// ClientConfig is rpc client conf. // ClientConfig is rpc client conf.
type ClientConfig struct { type ClientConfig struct {
Dial xtime.Duration Dial xtime.Duration
@ -74,7 +70,7 @@ type Client struct {
breaker *breaker.Group breaker *breaker.Group
mutex sync.RWMutex mutex sync.RWMutex
opt []grpc.DialOption opts []grpc.DialOption
handlers []grpc.UnaryClientInterceptor handlers []grpc.UnaryClientInterceptor
} }
@ -160,19 +156,20 @@ func NewConn(target string, opt ...grpc.DialOption) (*grpc.ClientConn, error) {
// NewClient returns a new blank Client instance with a default client interceptor. // NewClient returns a new blank Client instance with a default client interceptor.
// opt can be used to add grpc dial options. // opt can be used to add grpc dial options.
func NewClient(conf *ClientConfig, opt ...grpc.DialOption) *Client { func NewClient(conf *ClientConfig, opt ...grpc.DialOption) *Client {
resolver.Register(discovery.Builder())
c := new(Client) c := new(Client)
if err := c.SetConfig(conf); err != nil { if err := c.SetConfig(conf); err != nil {
panic(err) panic(err)
} }
c.UseOpt(grpc.WithBalancerName(p2c.Name)) c.UseOpt(grpc.WithBalancerName(p2c.Name))
c.UseOpt(opt...) c.UseOpt(opt...)
c.Use(c.recovery(), clientLogging(), c.handle())
return c return c
} }
// DefaultClient returns a new default Client instance with a default client interceptor and default dialoption. // DefaultClient returns a new default Client instance with a default client interceptor and default dialoption.
// opt can be used to add grpc dial options. // opt can be used to add grpc dial options.
func DefaultClient() *Client { func DefaultClient() *Client {
resolver.Register(discovery.Builder())
_once.Do(func() { _once.Do(func() {
_defaultClient = NewClient(nil) _defaultClient = NewClient(nil)
}) })
@ -221,21 +218,33 @@ func (c *Client) Use(handlers ...grpc.UnaryClientInterceptor) *Client {
} }
// UseOpt attachs a global grpc DialOption to the Client. // UseOpt attachs a global grpc DialOption to the Client.
func (c *Client) UseOpt(opt ...grpc.DialOption) *Client { func (c *Client) UseOpt(opts ...grpc.DialOption) *Client {
c.opt = append(c.opt, opt...) c.opts = append(c.opts, opts...)
return c return c
} }
// Dial creates a client connection to the given target. func (c *Client) cloneOpts() []grpc.DialOption {
// Target format is scheme://authority/endpoint?query_arg=value dialOptions := make([]grpc.DialOption, len(c.opts))
// example: direct://default/192.168.1.1:9000,192.168.1.2:9001 copy(dialOptions, c.opts)
func (c *Client) Dial(ctx context.Context, target string, opt ...grpc.DialOption) (conn *grpc.ClientConn, err error) { return dialOptions
}
func (c *Client) dial(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
dialOptions := c.cloneOpts()
if !c.conf.NonBlock { if !c.conf.NonBlock {
c.opt = append(c.opt, grpc.WithBlock()) dialOptions = append(dialOptions, grpc.WithBlock())
} }
c.opt = append(c.opt, grpc.WithInsecure()) dialOptions = append(dialOptions, opts...)
c.opt = append(c.opt, grpc.WithUnaryInterceptor(c.chainUnaryClient()))
c.opt = append(c.opt, opt...) // init default handler
var handlers []grpc.UnaryClientInterceptor
handlers = append(handlers, c.recovery())
handlers = append(handlers, clientLogging(dialOptions...))
handlers = append(handlers, c.handlers...)
// NOTE: c.handle must be a last interceptor.
handlers = append(handlers, c.handle())
dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(chainUnaryClient(handlers)))
c.mutex.RLock() c.mutex.RLock()
conf := c.conf conf := c.conf
c.mutex.RUnlock() c.mutex.RUnlock()
@ -268,43 +277,39 @@ func (c *Client) Dial(ctx context.Context, target string, opt ...grpc.DialOption
} }
target = u.String() target = u.String()
} }
if conn, err = grpc.DialContext(ctx, target, c.opt...); err != nil { if conn, err = grpc.DialContext(ctx, target, dialOptions...); err != nil {
fmt.Fprintf(os.Stderr, "warden client: dial %s error %v!", target, err) fmt.Fprintf(os.Stderr, "warden client: dial %s error %v!", target, err)
} }
err = errors.WithStack(err) err = errors.WithStack(err)
return return
} }
// Dial creates a client connection to the given target.
// Target format is scheme://authority/endpoint?query_arg=value
// example: discovery://default/account.account.service?cluster=shfy01&cluster=shfy02
func (c *Client) Dial(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
opts = append(opts, grpc.WithInsecure())
return c.dial(ctx, target, opts...)
}
// DialTLS creates a client connection over tls transport to the given target. // DialTLS creates a client connection over tls transport to the given target.
func (c *Client) DialTLS(ctx context.Context, target string, file string, name string) (conn *grpc.ClientConn, err error) { func (c *Client) DialTLS(ctx context.Context, target string, file string, name string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) {
var creds credentials.TransportCredentials var creds credentials.TransportCredentials
creds, err = credentials.NewClientTLSFromFile(file, name) creds, err = credentials.NewClientTLSFromFile(file, name)
if err != nil { if err != nil {
err = errors.WithStack(err) err = errors.WithStack(err)
return return
} }
c.opt = append(c.opt, grpc.WithBlock()) opts = append(opts, grpc.WithTransportCredentials(creds))
c.opt = append(c.opt, grpc.WithTransportCredentials(creds)) return c.dial(ctx, target, opts...)
c.opt = append(c.opt, grpc.WithUnaryInterceptor(c.chainUnaryClient()))
c.mutex.RLock()
conf := c.conf
c.mutex.RUnlock()
if conf.Dial > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, time.Duration(conf.Dial))
defer cancel()
}
conn, err = grpc.DialContext(ctx, target, c.opt...)
err = errors.WithStack(err)
return
} }
// chainUnaryClient creates a single interceptor out of a chain of many interceptors. // chainUnaryClient creates a single interceptor out of a chain of many interceptors.
// //
// Execution is done in left-to-right order, including passing of context. // Execution is done in left-to-right order, including passing of context.
// For example ChainUnaryClient(one, two, three) will execute one before two before three. // For example ChainUnaryClient(one, two, three) will execute one before two before three.
func (c *Client) chainUnaryClient() grpc.UnaryClientInterceptor { func chainUnaryClient(handlers []grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor {
n := len(c.handlers) n := len(handlers)
if n == 0 { if n == 0 {
return func(ctx context.Context, method string, req, reply interface{}, return func(ctx context.Context, method string, req, reply interface{},
cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
@ -323,9 +328,9 @@ func (c *Client) chainUnaryClient() grpc.UnaryClientInterceptor {
return invoker(ictx, imethod, ireq, ireply, ic, iopts...) return invoker(ictx, imethod, ireq, ireply, ic, iopts...)
} }
i++ i++
return c.handlers[i](ictx, imethod, ireq, ireply, ic, chainHandler, iopts...) return handlers[i](ictx, imethod, ireq, ireply, ic, chainHandler, iopts...)
} }
return c.handlers[0](ctx, method, req, reply, cc, chainHandler, opts...) return handlers[0](ctx, method, req, reply, cc, chainHandler, opts...)
} }
} }

@ -0,0 +1,34 @@
package warden
import (
"context"
"testing"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
)
func TestChainUnaryClient(t *testing.T) {
var orders []string
factory := func(name string) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
orders = append(orders, name+"-in")
err := invoker(ctx, method, req, reply, cc, opts...)
orders = append(orders, name+"-out")
return err
}
}
handlers := []grpc.UnaryClientInterceptor{factory("h1"), factory("h2"), factory("h3")}
interceptor := chainUnaryClient(handlers)
interceptor(context.Background(), "test", nil, nil, nil, func(context.Context, string, interface{}, interface{}, *grpc.ClientConn, ...grpc.CallOption) error {
return nil
})
assert.Equal(t, []string{
"h1-in",
"h2-in",
"h3-in",
"h3-out",
"h2-out",
"h1-out",
}, orders)
}

@ -4,7 +4,7 @@ package pb;
import "google/protobuf/any.proto"; import "google/protobuf/any.proto";
option go_package = "go-common/library/ecode/pb"; option go_package = "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/pb";
message Error { message Error {
int32 err_code = 1; int32 err_code = 1;

@ -20,6 +20,50 @@ var (
statsServer = stat.RPCServer statsServer = stat.RPCServer
) )
// Warden Log Flag
const (
// disable all log.
LogFlagDisable = 1 << iota
// disable print args on log.
LogFlagDisableArgs
// disable info level log.
LogFlagDisableInfo
)
type logOption struct {
grpc.EmptyDialOption
grpc.EmptyCallOption
flag int8
}
// WithLogFlag disable client access log.
func WithLogFlag(flag int8) grpc.CallOption {
return logOption{flag: flag}
}
// WithDialLogFlag set client level log behaviour.
func WithDialLogFlag(flag int8) grpc.DialOption {
return logOption{flag: flag}
}
func extractLogCallOption(opts []grpc.CallOption) (flag int8) {
for _, opt := range opts {
if logOpt, ok := opt.(logOption); ok {
return logOpt.flag
}
}
return
}
func extractLogDialOption(opts []grpc.DialOption) (flag int8) {
for _, opt := range opts {
if logOpt, ok := opt.(logOption); ok {
return logOpt.flag
}
}
return
}
func logFn(code int, dt time.Duration) func(context.Context, ...log.D) { func logFn(code int, dt time.Duration) func(context.Context, ...log.D) {
switch { switch {
case code < 0: case code < 0:
@ -34,8 +78,11 @@ func logFn(code int, dt time.Duration) func(context.Context, ...log.D) {
} }
// clientLogging warden grpc logging // clientLogging warden grpc logging
func clientLogging() grpc.UnaryClientInterceptor { func clientLogging(dialOptions ...grpc.DialOption) grpc.UnaryClientInterceptor {
defaultFlag := extractLogDialOption(dialOptions)
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
logFlag := extractLogCallOption(opts) | defaultFlag
startTime := time.Now() startTime := time.Now()
var peerInfo peer.Peer var peerInfo peer.Peer
opts = append(opts, grpc.Peer(&peerInfo)) opts = append(opts, grpc.Peer(&peerInfo))
@ -50,21 +97,27 @@ func clientLogging() grpc.UnaryClientInterceptor {
statsClient.Timing(method, int64(duration/time.Millisecond)) statsClient.Timing(method, int64(duration/time.Millisecond))
statsClient.Incr(method, strconv.Itoa(code)) statsClient.Incr(method, strconv.Itoa(code))
var ip string if logFlag&LogFlagDisable != 0 {
return err
}
// TODO: find better way to deal with slow log.
if logFlag&LogFlagDisableInfo != 0 && err == nil && duration < 500*time.Millisecond {
return err
}
logFields := make([]log.D, 0, 7)
logFields = append(logFields, log.KVString("path", method))
logFields = append(logFields, log.KVInt("ret", code))
logFields = append(logFields, log.KVFloat64("ts", duration.Seconds()))
logFields = append(logFields, log.KVString("source", "grpc-access-log"))
if peerInfo.Addr != nil { if peerInfo.Addr != nil {
ip = peerInfo.Addr.String() logFields = append(logFields, log.KVString("ip", peerInfo.Addr.String()))
} }
logFields := []log.D{ if logFlag&LogFlagDisableArgs == 0 {
log.KVString("ip", ip),
log.KVString("path", method),
log.KVInt("ret", code),
// TODO: it will panic if someone remove String method from protobuf message struct that auto generate from protoc. // TODO: it will panic if someone remove String method from protobuf message struct that auto generate from protoc.
log.KVString("args", req.(fmt.Stringer).String()), logFields = append(logFields, log.KVString("args", req.(fmt.Stringer).String()))
log.KVFloat64("ts", duration.Seconds()),
log.KVString("source", "grpc-access-log"),
} }
if err != nil { if err != nil {
logFields = append(logFields, log.KV("error", err.Error()), log.KVString("stack", fmt.Sprintf("%+v", err))) logFields = append(logFields, log.KVString("error", err.Error()), log.KVString("stack", fmt.Sprintf("%+v", err)))
} }
logFn(code, duration)(ctx, logFields...) logFn(code, duration)(ctx, logFields...)
return err return err
@ -72,7 +125,7 @@ func clientLogging() grpc.UnaryClientInterceptor {
} }
// serverLogging warden grpc logging // serverLogging warden grpc logging
func serverLogging() grpc.UnaryServerInterceptor { func serverLogging(logFlag int8) grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
startTime := time.Now() startTime := time.Now()
caller := metadata.String(ctx, metadata.Caller) caller := metadata.String(ctx, metadata.Caller)
@ -94,23 +147,32 @@ func serverLogging() grpc.UnaryServerInterceptor {
// after server response // after server response
code := ecode.Cause(err).Code() code := ecode.Cause(err).Code()
duration := time.Since(startTime) duration := time.Since(startTime)
// monitor // monitor
statsServer.Timing(caller, int64(duration/time.Millisecond), info.FullMethod) statsServer.Timing(caller, int64(duration/time.Millisecond), info.FullMethod)
statsServer.Incr(caller, info.FullMethod, strconv.Itoa(code)) statsServer.Incr(caller, info.FullMethod, strconv.Itoa(code))
if logFlag&LogFlagDisable != 0 {
return resp, err
}
// TODO: find better way to deal with slow log.
if logFlag&LogFlagDisableInfo != 0 && err == nil && duration < 500*time.Millisecond {
return resp, err
}
logFields := []log.D{ logFields := []log.D{
log.KVString("user", caller), log.KVString("user", caller),
log.KVString("ip", remoteIP), log.KVString("ip", remoteIP),
log.KVString("path", info.FullMethod), log.KVString("path", info.FullMethod),
log.KVInt("ret", code), log.KVInt("ret", code),
// TODO: it will panic if someone remove String method from protobuf message struct that auto generate from protoc.
log.KVString("args", req.(fmt.Stringer).String()),
log.KVFloat64("ts", duration.Seconds()), log.KVFloat64("ts", duration.Seconds()),
log.KVFloat64("timeout_quota", quota), log.KVFloat64("timeout_quota", quota),
log.KVString("source", "grpc-access-log"), log.KVString("source", "grpc-access-log"),
} }
if logFlag&LogFlagDisableArgs == 0 {
// TODO: it will panic if someone remove String method from protobuf message struct that auto generate from protoc.
logFields = append(logFields, log.KVString("args", req.(fmt.Stringer).String()))
}
if err != nil { if err != nil {
logFields = append(logFields, log.KV("error", err.Error()), log.KV("stack", fmt.Sprintf("%+v", err))) logFields = append(logFields, log.KVString("error", err.Error()), log.KVString("stack", fmt.Sprintf("%+v", err)))
} }
logFn(code, duration)(ctx, logFields...) logFn(code, duration)(ctx, logFields...)
return resp, err return resp, err

@ -1,11 +1,18 @@
package warden package warden
import ( import (
"bytes"
"context" "context"
"errors"
"io/ioutil"
"os"
"reflect" "reflect"
"testing" "testing"
"time" "time"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
"github.com/bilibili/kratos/pkg/log" "github.com/bilibili/kratos/pkg/log"
) )
@ -53,3 +60,235 @@ func Test_logFn(t *testing.T) {
}) })
} }
} }
func callInterceptor(err error, interceptor grpc.UnaryClientInterceptor, opts ...grpc.CallOption) {
interceptor(context.Background(),
"test-method",
bytes.NewBufferString("test-req"),
"test_reply",
&grpc.ClientConn{},
func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error {
return err
}, opts...)
}
func TestClientLog(t *testing.T) {
stderr, err := ioutil.TempFile(os.TempDir(), "stderr")
if err != nil {
t.Fatal(err)
}
old := os.Stderr
os.Stderr = stderr
t.Logf("capture stderr file: %s", stderr.Name())
t.Run("test no option", func(t *testing.T) {
callInterceptor(nil, clientLogging())
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Contains(t, string(data), "test-method")
assert.Contains(t, string(data), "test-req")
assert.Contains(t, string(data), "path")
assert.Contains(t, string(data), "ret")
assert.Contains(t, string(data), "ts")
assert.Contains(t, string(data), "grpc-access-log")
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
t.Run("test disable args", func(t *testing.T) {
callInterceptor(nil, clientLogging(WithDialLogFlag(LogFlagDisableArgs)))
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Contains(t, string(data), "test-method")
assert.NotContains(t, string(data), "test-req")
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
t.Run("test disable args and disable info", func(t *testing.T) {
callInterceptor(nil, clientLogging(WithDialLogFlag(LogFlagDisableArgs|LogFlagDisableInfo)))
callInterceptor(errors.New("test-error"), clientLogging(WithDialLogFlag(LogFlagDisableArgs|LogFlagDisableInfo)))
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Contains(t, string(data), "test-method")
assert.Contains(t, string(data), "test-error")
assert.NotContains(t, string(data), "INFO")
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
t.Run("test call option", func(t *testing.T) {
callInterceptor(nil, clientLogging(), WithLogFlag(LogFlagDisableArgs))
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Contains(t, string(data), "test-method")
assert.NotContains(t, string(data), "test-req")
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
t.Run("test combine option", func(t *testing.T) {
interceptor := clientLogging(WithDialLogFlag(LogFlagDisableInfo))
callInterceptor(nil, interceptor, WithLogFlag(LogFlagDisableArgs))
callInterceptor(errors.New("test-error"), interceptor, WithLogFlag(LogFlagDisableArgs))
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Contains(t, string(data), "test-method")
assert.Contains(t, string(data), "test-error")
assert.NotContains(t, string(data), "INFO")
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
t.Run("test no log", func(t *testing.T) {
callInterceptor(errors.New("test error"), clientLogging(WithDialLogFlag(LogFlagDisable)))
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Empty(t, data)
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
t.Run("test multi flag", func(t *testing.T) {
interceptor := clientLogging(WithDialLogFlag(LogFlagDisableInfo | LogFlagDisableArgs))
callInterceptor(nil, interceptor)
callInterceptor(errors.New("test-error"), interceptor)
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Contains(t, string(data), "test-method")
assert.Contains(t, string(data), "test-error")
assert.NotContains(t, string(data), "INFO")
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
os.Stderr = old
}
func callServerInterceptor(err error, interceptor grpc.UnaryServerInterceptor) {
interceptor(context.Background(),
bytes.NewBufferString("test-req"),
&grpc.UnaryServerInfo{
FullMethod: "test-method",
},
func(ctx context.Context, req interface{}) (interface{}, error) { return nil, err })
}
func TestServerLog(t *testing.T) {
stderr, err := ioutil.TempFile(os.TempDir(), "stderr")
if err != nil {
t.Fatal(err)
}
old := os.Stderr
os.Stderr = stderr
t.Logf("capture stderr file: %s", stderr.Name())
t.Run("test no option", func(t *testing.T) {
callServerInterceptor(nil, serverLogging(0))
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Contains(t, string(data), "test-method")
assert.Contains(t, string(data), "test-req")
assert.Contains(t, string(data), "path")
assert.Contains(t, string(data), "ret")
assert.Contains(t, string(data), "ts")
assert.Contains(t, string(data), "grpc-access-log")
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
t.Run("test disable args", func(t *testing.T) {
callServerInterceptor(nil, serverLogging(LogFlagDisableArgs))
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Contains(t, string(data), "test-method")
assert.NotContains(t, string(data), "test-req")
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
t.Run("test no log", func(t *testing.T) {
callServerInterceptor(errors.New("test error"), serverLogging(LogFlagDisable))
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Empty(t, data)
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
t.Run("test multi flag", func(t *testing.T) {
interceptor := serverLogging(LogFlagDisableInfo | LogFlagDisableArgs)
callServerInterceptor(nil, interceptor)
callServerInterceptor(errors.New("test-error"), interceptor)
stderr.Seek(0, os.SEEK_SET)
data, err := ioutil.ReadAll(stderr)
if err != nil {
t.Error(err)
}
assert.Contains(t, string(data), "test-method")
assert.Contains(t, string(data), "test-error")
assert.NotContains(t, string(data), "INFO")
stderr.Seek(0, os.SEEK_SET)
stderr.Truncate(0)
})
os.Stderr = old
}

@ -13,6 +13,7 @@ import (
"github.com/bilibili/kratos/pkg/conf/dsn" "github.com/bilibili/kratos/pkg/conf/dsn"
"github.com/bilibili/kratos/pkg/log" "github.com/bilibili/kratos/pkg/log"
nmd "github.com/bilibili/kratos/pkg/net/metadata" nmd "github.com/bilibili/kratos/pkg/net/metadata"
"github.com/bilibili/kratos/pkg/net/rpc/warden/ratelimiter"
"github.com/bilibili/kratos/pkg/net/trace" "github.com/bilibili/kratos/pkg/net/trace"
xtime "github.com/bilibili/kratos/pkg/time" xtime "github.com/bilibili/kratos/pkg/time"
@ -64,6 +65,9 @@ type ServerConfig struct {
// KeepAliveTimeout is After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that // KeepAliveTimeout is After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed. // the connection is closed.
KeepAliveTimeout xtime.Duration `dsn:"query.keepaliveTimeout"` KeepAliveTimeout xtime.Duration `dsn:"query.keepaliveTimeout"`
// LogFlag to control log behaviour. e.g. LogFlag: warden.LogFlagDisableLog.
// Disable: 1 DisableArgs: 2 DisableInfo: 4
LogFlag int8 `dsn:"query.logFlag"`
} }
// Server is the framework's server side instance, it contains the GrpcServer, interceptor and interceptors. // Server is the framework's server side instance, it contains the GrpcServer, interceptor and interceptors.
@ -135,7 +139,6 @@ func (s *Server) handle() grpc.UnaryServerInterceptor {
func init() { func init() {
addFlag(flag.CommandLine) addFlag(flag.CommandLine)
} }
func addFlag(fs *flag.FlagSet) { func addFlag(fs *flag.FlagSet) {
@ -166,6 +169,8 @@ func NewServer(conf *ServerConfig, opt ...grpc.ServerOption) (s *Server) {
fmt.Fprint(os.Stderr, "[warden] please call flag.Parse() before Init warden server, some configure may not effect\n") fmt.Fprint(os.Stderr, "[warden] please call flag.Parse() before Init warden server, some configure may not effect\n")
} }
conf = parseDSN(_grpcDSN) conf = parseDSN(_grpcDSN)
} else {
fmt.Fprintf(os.Stderr, "[warden] config is Deprecated, argument will be ignored. please use -grpc flag or GRPC env to configure warden server.\n")
} }
s = new(Server) s = new(Server)
if err := s.SetConfig(conf); err != nil { if err := s.SetConfig(conf); err != nil {
@ -180,7 +185,8 @@ func NewServer(conf *ServerConfig, opt ...grpc.ServerOption) (s *Server) {
}) })
opt = append(opt, keepParam, grpc.UnaryInterceptor(s.interceptor)) opt = append(opt, keepParam, grpc.UnaryInterceptor(s.interceptor))
s.server = grpc.NewServer(opt...) s.server = grpc.NewServer(opt...)
s.Use(s.recovery(), s.handle(), serverLogging(), s.stats(), s.validate()) s.Use(s.recovery(), s.handle(), serverLogging(conf.LogFlag), s.stats(), s.validate())
s.Use(ratelimiter.New(nil).Limit())
return return
} }
@ -299,8 +305,6 @@ func (s *Server) Start() (*Server, error) {
return nil, err return nil, err
} }
reflection.Register(s.server) reflection.Register(s.server)
log.Info("warden: start grpc listen addr: %s", s.conf.Addr)
go func() { go func() {
if err := s.Serve(lis); err != nil { if err := s.Serve(lis); err != nil {
panic(err) panic(err)

Loading…
Cancel
Save