diff --git a/pkg/net/rpc/warden/CHANGELOG.md b/pkg/net/rpc/warden/CHANGELOG.md index ebaff68ad..79bec5a3e 100644 --- a/pkg/net/rpc/warden/CHANGELOG.md +++ b/pkg/net/rpc/warden/CHANGELOG.md @@ -1,9 +1,21 @@ ### net/rpc/warden +##### Version 1.1.17 +1. 移除 bbr feature flag,默认开启自适应限流 + +##### Version 1.1.16 +1. 使用 flag(grpc.bbr) 绑定 BBR 限流 + +##### Version 1.1.15 +1. warden使用 metadata.Range 方法 + ##### Version 1.1.14 -1. p2c balancer增加filter保护 +1. 为 server log 添加选项 ##### Version 1.1.13 +1. 为 client log 添加选项 + +##### Version 1.1.12 1. 设置 caller 为 no_user 如果 user 不存在 ##### Version 1.1.12 diff --git a/pkg/net/rpc/warden/README.md b/pkg/net/rpc/warden/README.md index d3d062ade..3f8021442 100644 --- a/pkg/net/rpc/warden/README.md +++ b/pkg/net/rpc/warden/README.md @@ -1,5 +1,13 @@ -#### net/rpc/warden +#### net/rcp/warden ##### 项目简介 -gRPC 框架,带来如飞一般的体验。 +来自 bilibili 主站技术部的 RPC 框架,融合主站技术部的核心科技,带来如飞一般的体验。 + +##### 编译环境 + +- **请只用 Golang v1.9.x 以上版本编译执行** + +##### 依赖包 + +- [grpc](google.golang.org/grpc) diff --git a/pkg/net/rpc/warden/client.go b/pkg/net/rpc/warden/client.go index f206ec99c..97a7af882 100644 --- a/pkg/net/rpc/warden/client.go +++ b/pkg/net/rpc/warden/client.go @@ -14,12 +14,12 @@ import ( "github.com/bilibili/kratos/pkg/conf/flagvar" "github.com/bilibili/kratos/pkg/ecode" "github.com/bilibili/kratos/pkg/naming" + "github.com/bilibili/kratos/pkg/naming/discovery" nmd "github.com/bilibili/kratos/pkg/net/metadata" "github.com/bilibili/kratos/pkg/net/netutil/breaker" "github.com/bilibili/kratos/pkg/net/rpc/warden/balancer/p2c" "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/status" "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver" - "github.com/bilibili/kratos/pkg/net/rpc/warden/resolver/direct" "github.com/bilibili/kratos/pkg/net/trace" xtime "github.com/bilibili/kratos/pkg/time" @@ -51,10 +51,6 @@ func baseMetadata() metadata.MD { return gmd } -func init() { - resolver.Register(direct.New()) -} - // ClientConfig is rpc client conf. type ClientConfig struct { Dial xtime.Duration @@ -74,7 +70,7 @@ type Client struct { breaker *breaker.Group mutex sync.RWMutex - opt []grpc.DialOption + opts []grpc.DialOption handlers []grpc.UnaryClientInterceptor } @@ -160,19 +156,20 @@ func NewConn(target string, opt ...grpc.DialOption) (*grpc.ClientConn, error) { // NewClient returns a new blank Client instance with a default client interceptor. // opt can be used to add grpc dial options. func NewClient(conf *ClientConfig, opt ...grpc.DialOption) *Client { + resolver.Register(discovery.Builder()) c := new(Client) if err := c.SetConfig(conf); err != nil { panic(err) } c.UseOpt(grpc.WithBalancerName(p2c.Name)) c.UseOpt(opt...) - c.Use(c.recovery(), clientLogging(), c.handle()) return c } // DefaultClient returns a new default Client instance with a default client interceptor and default dialoption. // opt can be used to add grpc dial options. func DefaultClient() *Client { + resolver.Register(discovery.Builder()) _once.Do(func() { _defaultClient = NewClient(nil) }) @@ -221,21 +218,33 @@ func (c *Client) Use(handlers ...grpc.UnaryClientInterceptor) *Client { } // UseOpt attachs a global grpc DialOption to the Client. -func (c *Client) UseOpt(opt ...grpc.DialOption) *Client { - c.opt = append(c.opt, opt...) +func (c *Client) UseOpt(opts ...grpc.DialOption) *Client { + c.opts = append(c.opts, opts...) return c } -// Dial creates a client connection to the given target. -// Target format is scheme://authority/endpoint?query_arg=value -// example: direct://default/192.168.1.1:9000,192.168.1.2:9001 -func (c *Client) Dial(ctx context.Context, target string, opt ...grpc.DialOption) (conn *grpc.ClientConn, err error) { +func (c *Client) cloneOpts() []grpc.DialOption { + dialOptions := make([]grpc.DialOption, len(c.opts)) + copy(dialOptions, c.opts) + return dialOptions +} + +func (c *Client) dial(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + dialOptions := c.cloneOpts() if !c.conf.NonBlock { - c.opt = append(c.opt, grpc.WithBlock()) + dialOptions = append(dialOptions, grpc.WithBlock()) } - c.opt = append(c.opt, grpc.WithInsecure()) - c.opt = append(c.opt, grpc.WithUnaryInterceptor(c.chainUnaryClient())) - c.opt = append(c.opt, opt...) + dialOptions = append(dialOptions, opts...) + + // init default handler + var handlers []grpc.UnaryClientInterceptor + handlers = append(handlers, c.recovery()) + handlers = append(handlers, clientLogging(dialOptions...)) + handlers = append(handlers, c.handlers...) + // NOTE: c.handle must be a last interceptor. + handlers = append(handlers, c.handle()) + + dialOptions = append(dialOptions, grpc.WithUnaryInterceptor(chainUnaryClient(handlers))) c.mutex.RLock() conf := c.conf c.mutex.RUnlock() @@ -268,43 +277,39 @@ func (c *Client) Dial(ctx context.Context, target string, opt ...grpc.DialOption } target = u.String() } - if conn, err = grpc.DialContext(ctx, target, c.opt...); err != nil { + if conn, err = grpc.DialContext(ctx, target, dialOptions...); err != nil { fmt.Fprintf(os.Stderr, "warden client: dial %s error %v!", target, err) } err = errors.WithStack(err) return } +// Dial creates a client connection to the given target. +// Target format is scheme://authority/endpoint?query_arg=value +// example: discovery://default/account.account.service?cluster=shfy01&cluster=shfy02 +func (c *Client) Dial(ctx context.Context, target string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { + opts = append(opts, grpc.WithInsecure()) + return c.dial(ctx, target, opts...) +} + // DialTLS creates a client connection over tls transport to the given target. -func (c *Client) DialTLS(ctx context.Context, target string, file string, name string) (conn *grpc.ClientConn, err error) { +func (c *Client) DialTLS(ctx context.Context, target string, file string, name string, opts ...grpc.DialOption) (conn *grpc.ClientConn, err error) { var creds credentials.TransportCredentials creds, err = credentials.NewClientTLSFromFile(file, name) if err != nil { err = errors.WithStack(err) return } - c.opt = append(c.opt, grpc.WithBlock()) - c.opt = append(c.opt, grpc.WithTransportCredentials(creds)) - c.opt = append(c.opt, grpc.WithUnaryInterceptor(c.chainUnaryClient())) - c.mutex.RLock() - conf := c.conf - c.mutex.RUnlock() - if conf.Dial > 0 { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, time.Duration(conf.Dial)) - defer cancel() - } - conn, err = grpc.DialContext(ctx, target, c.opt...) - err = errors.WithStack(err) - return + opts = append(opts, grpc.WithTransportCredentials(creds)) + return c.dial(ctx, target, opts...) } // chainUnaryClient creates a single interceptor out of a chain of many interceptors. // // Execution is done in left-to-right order, including passing of context. // For example ChainUnaryClient(one, two, three) will execute one before two before three. -func (c *Client) chainUnaryClient() grpc.UnaryClientInterceptor { - n := len(c.handlers) +func chainUnaryClient(handlers []grpc.UnaryClientInterceptor) grpc.UnaryClientInterceptor { + n := len(handlers) if n == 0 { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { @@ -323,9 +328,9 @@ func (c *Client) chainUnaryClient() grpc.UnaryClientInterceptor { return invoker(ictx, imethod, ireq, ireply, ic, iopts...) } i++ - return c.handlers[i](ictx, imethod, ireq, ireply, ic, chainHandler, iopts...) + return handlers[i](ictx, imethod, ireq, ireply, ic, chainHandler, iopts...) } - return c.handlers[0](ctx, method, req, reply, cc, chainHandler, opts...) + return handlers[0](ctx, method, req, reply, cc, chainHandler, opts...) } } diff --git a/pkg/net/rpc/warden/client_test.go b/pkg/net/rpc/warden/client_test.go new file mode 100644 index 000000000..96be46368 --- /dev/null +++ b/pkg/net/rpc/warden/client_test.go @@ -0,0 +1,34 @@ +package warden + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" +) + +func TestChainUnaryClient(t *testing.T) { + var orders []string + factory := func(name string) grpc.UnaryClientInterceptor { + return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + orders = append(orders, name+"-in") + err := invoker(ctx, method, req, reply, cc, opts...) + orders = append(orders, name+"-out") + return err + } + } + handlers := []grpc.UnaryClientInterceptor{factory("h1"), factory("h2"), factory("h3")} + interceptor := chainUnaryClient(handlers) + interceptor(context.Background(), "test", nil, nil, nil, func(context.Context, string, interface{}, interface{}, *grpc.ClientConn, ...grpc.CallOption) error { + return nil + }) + assert.Equal(t, []string{ + "h1-in", + "h2-in", + "h3-in", + "h3-out", + "h2-out", + "h1-out", + }, orders) +} diff --git a/pkg/net/rpc/warden/internal/pb/ecode.proto b/pkg/net/rpc/warden/internal/pb/ecode.proto index bd50e7207..1a18c6957 100644 --- a/pkg/net/rpc/warden/internal/pb/ecode.proto +++ b/pkg/net/rpc/warden/internal/pb/ecode.proto @@ -4,7 +4,7 @@ package pb; import "google/protobuf/any.proto"; -option go_package = "go-common/library/ecode/pb"; +option go_package = "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/pb"; message Error { int32 err_code = 1; diff --git a/pkg/net/rpc/warden/logging.go b/pkg/net/rpc/warden/logging.go index 5f7f573e3..7bd8c10a4 100644 --- a/pkg/net/rpc/warden/logging.go +++ b/pkg/net/rpc/warden/logging.go @@ -20,6 +20,50 @@ var ( statsServer = stat.RPCServer ) +// Warden Log Flag +const ( + // disable all log. + LogFlagDisable = 1 << iota + // disable print args on log. + LogFlagDisableArgs + // disable info level log. + LogFlagDisableInfo +) + +type logOption struct { + grpc.EmptyDialOption + grpc.EmptyCallOption + flag int8 +} + +// WithLogFlag disable client access log. +func WithLogFlag(flag int8) grpc.CallOption { + return logOption{flag: flag} +} + +// WithDialLogFlag set client level log behaviour. +func WithDialLogFlag(flag int8) grpc.DialOption { + return logOption{flag: flag} +} + +func extractLogCallOption(opts []grpc.CallOption) (flag int8) { + for _, opt := range opts { + if logOpt, ok := opt.(logOption); ok { + return logOpt.flag + } + } + return +} + +func extractLogDialOption(opts []grpc.DialOption) (flag int8) { + for _, opt := range opts { + if logOpt, ok := opt.(logOption); ok { + return logOpt.flag + } + } + return +} + func logFn(code int, dt time.Duration) func(context.Context, ...log.D) { switch { case code < 0: @@ -34,8 +78,11 @@ func logFn(code int, dt time.Duration) func(context.Context, ...log.D) { } // clientLogging warden grpc logging -func clientLogging() grpc.UnaryClientInterceptor { +func clientLogging(dialOptions ...grpc.DialOption) grpc.UnaryClientInterceptor { + defaultFlag := extractLogDialOption(dialOptions) return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + logFlag := extractLogCallOption(opts) | defaultFlag + startTime := time.Now() var peerInfo peer.Peer opts = append(opts, grpc.Peer(&peerInfo)) @@ -50,21 +97,27 @@ func clientLogging() grpc.UnaryClientInterceptor { statsClient.Timing(method, int64(duration/time.Millisecond)) statsClient.Incr(method, strconv.Itoa(code)) - var ip string + if logFlag&LogFlagDisable != 0 { + return err + } + // TODO: find better way to deal with slow log. + if logFlag&LogFlagDisableInfo != 0 && err == nil && duration < 500*time.Millisecond { + return err + } + logFields := make([]log.D, 0, 7) + logFields = append(logFields, log.KVString("path", method)) + logFields = append(logFields, log.KVInt("ret", code)) + logFields = append(logFields, log.KVFloat64("ts", duration.Seconds())) + logFields = append(logFields, log.KVString("source", "grpc-access-log")) if peerInfo.Addr != nil { - ip = peerInfo.Addr.String() + logFields = append(logFields, log.KVString("ip", peerInfo.Addr.String())) } - logFields := []log.D{ - log.KVString("ip", ip), - log.KVString("path", method), - log.KVInt("ret", code), + if logFlag&LogFlagDisableArgs == 0 { // TODO: it will panic if someone remove String method from protobuf message struct that auto generate from protoc. - log.KVString("args", req.(fmt.Stringer).String()), - log.KVFloat64("ts", duration.Seconds()), - log.KVString("source", "grpc-access-log"), + logFields = append(logFields, log.KVString("args", req.(fmt.Stringer).String())) } if err != nil { - logFields = append(logFields, log.KV("error", err.Error()), log.KVString("stack", fmt.Sprintf("%+v", err))) + logFields = append(logFields, log.KVString("error", err.Error()), log.KVString("stack", fmt.Sprintf("%+v", err))) } logFn(code, duration)(ctx, logFields...) return err @@ -72,7 +125,7 @@ func clientLogging() grpc.UnaryClientInterceptor { } // serverLogging warden grpc logging -func serverLogging() grpc.UnaryServerInterceptor { +func serverLogging(logFlag int8) grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { startTime := time.Now() caller := metadata.String(ctx, metadata.Caller) @@ -94,23 +147,32 @@ func serverLogging() grpc.UnaryServerInterceptor { // after server response code := ecode.Cause(err).Code() duration := time.Since(startTime) - // monitor statsServer.Timing(caller, int64(duration/time.Millisecond), info.FullMethod) statsServer.Incr(caller, info.FullMethod, strconv.Itoa(code)) + + if logFlag&LogFlagDisable != 0 { + return resp, err + } + // TODO: find better way to deal with slow log. + if logFlag&LogFlagDisableInfo != 0 && err == nil && duration < 500*time.Millisecond { + return resp, err + } logFields := []log.D{ log.KVString("user", caller), log.KVString("ip", remoteIP), log.KVString("path", info.FullMethod), log.KVInt("ret", code), - // TODO: it will panic if someone remove String method from protobuf message struct that auto generate from protoc. - log.KVString("args", req.(fmt.Stringer).String()), log.KVFloat64("ts", duration.Seconds()), log.KVFloat64("timeout_quota", quota), log.KVString("source", "grpc-access-log"), } + if logFlag&LogFlagDisableArgs == 0 { + // TODO: it will panic if someone remove String method from protobuf message struct that auto generate from protoc. + logFields = append(logFields, log.KVString("args", req.(fmt.Stringer).String())) + } if err != nil { - logFields = append(logFields, log.KV("error", err.Error()), log.KV("stack", fmt.Sprintf("%+v", err))) + logFields = append(logFields, log.KVString("error", err.Error()), log.KVString("stack", fmt.Sprintf("%+v", err))) } logFn(code, duration)(ctx, logFields...) return resp, err diff --git a/pkg/net/rpc/warden/logging_test.go b/pkg/net/rpc/warden/logging_test.go index 92b04c725..c8c829f7b 100644 --- a/pkg/net/rpc/warden/logging_test.go +++ b/pkg/net/rpc/warden/logging_test.go @@ -1,11 +1,18 @@ package warden import ( + "bytes" "context" + "errors" + "io/ioutil" + "os" "reflect" "testing" "time" + "github.com/stretchr/testify/assert" + "google.golang.org/grpc" + "github.com/bilibili/kratos/pkg/log" ) @@ -53,3 +60,235 @@ func Test_logFn(t *testing.T) { }) } } + +func callInterceptor(err error, interceptor grpc.UnaryClientInterceptor, opts ...grpc.CallOption) { + interceptor(context.Background(), + "test-method", + bytes.NewBufferString("test-req"), + "test_reply", + &grpc.ClientConn{}, + func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, opts ...grpc.CallOption) error { + return err + }, opts...) +} + +func TestClientLog(t *testing.T) { + stderr, err := ioutil.TempFile(os.TempDir(), "stderr") + if err != nil { + t.Fatal(err) + } + old := os.Stderr + os.Stderr = stderr + t.Logf("capture stderr file: %s", stderr.Name()) + + t.Run("test no option", func(t *testing.T) { + callInterceptor(nil, clientLogging()) + + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Contains(t, string(data), "test-method") + assert.Contains(t, string(data), "test-req") + assert.Contains(t, string(data), "path") + assert.Contains(t, string(data), "ret") + assert.Contains(t, string(data), "ts") + assert.Contains(t, string(data), "grpc-access-log") + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + + t.Run("test disable args", func(t *testing.T) { + callInterceptor(nil, clientLogging(WithDialLogFlag(LogFlagDisableArgs))) + + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Contains(t, string(data), "test-method") + assert.NotContains(t, string(data), "test-req") + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + + t.Run("test disable args and disable info", func(t *testing.T) { + callInterceptor(nil, clientLogging(WithDialLogFlag(LogFlagDisableArgs|LogFlagDisableInfo))) + callInterceptor(errors.New("test-error"), clientLogging(WithDialLogFlag(LogFlagDisableArgs|LogFlagDisableInfo))) + + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Contains(t, string(data), "test-method") + assert.Contains(t, string(data), "test-error") + assert.NotContains(t, string(data), "INFO") + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + + t.Run("test call option", func(t *testing.T) { + callInterceptor(nil, clientLogging(), WithLogFlag(LogFlagDisableArgs)) + + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Contains(t, string(data), "test-method") + assert.NotContains(t, string(data), "test-req") + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + + t.Run("test combine option", func(t *testing.T) { + interceptor := clientLogging(WithDialLogFlag(LogFlagDisableInfo)) + callInterceptor(nil, interceptor, WithLogFlag(LogFlagDisableArgs)) + callInterceptor(errors.New("test-error"), interceptor, WithLogFlag(LogFlagDisableArgs)) + + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Contains(t, string(data), "test-method") + assert.Contains(t, string(data), "test-error") + assert.NotContains(t, string(data), "INFO") + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + t.Run("test no log", func(t *testing.T) { + callInterceptor(errors.New("test error"), clientLogging(WithDialLogFlag(LogFlagDisable))) + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Empty(t, data) + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + + t.Run("test multi flag", func(t *testing.T) { + interceptor := clientLogging(WithDialLogFlag(LogFlagDisableInfo | LogFlagDisableArgs)) + callInterceptor(nil, interceptor) + callInterceptor(errors.New("test-error"), interceptor) + + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Contains(t, string(data), "test-method") + assert.Contains(t, string(data), "test-error") + assert.NotContains(t, string(data), "INFO") + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + os.Stderr = old +} + +func callServerInterceptor(err error, interceptor grpc.UnaryServerInterceptor) { + interceptor(context.Background(), + bytes.NewBufferString("test-req"), + &grpc.UnaryServerInfo{ + FullMethod: "test-method", + }, + func(ctx context.Context, req interface{}) (interface{}, error) { return nil, err }) +} + +func TestServerLog(t *testing.T) { + stderr, err := ioutil.TempFile(os.TempDir(), "stderr") + if err != nil { + t.Fatal(err) + } + old := os.Stderr + os.Stderr = stderr + t.Logf("capture stderr file: %s", stderr.Name()) + + t.Run("test no option", func(t *testing.T) { + callServerInterceptor(nil, serverLogging(0)) + + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Contains(t, string(data), "test-method") + assert.Contains(t, string(data), "test-req") + assert.Contains(t, string(data), "path") + assert.Contains(t, string(data), "ret") + assert.Contains(t, string(data), "ts") + assert.Contains(t, string(data), "grpc-access-log") + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + + t.Run("test disable args", func(t *testing.T) { + callServerInterceptor(nil, serverLogging(LogFlagDisableArgs)) + + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Contains(t, string(data), "test-method") + assert.NotContains(t, string(data), "test-req") + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + + t.Run("test no log", func(t *testing.T) { + callServerInterceptor(errors.New("test error"), serverLogging(LogFlagDisable)) + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Empty(t, data) + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + + t.Run("test multi flag", func(t *testing.T) { + interceptor := serverLogging(LogFlagDisableInfo | LogFlagDisableArgs) + callServerInterceptor(nil, interceptor) + callServerInterceptor(errors.New("test-error"), interceptor) + + stderr.Seek(0, os.SEEK_SET) + + data, err := ioutil.ReadAll(stderr) + if err != nil { + t.Error(err) + } + assert.Contains(t, string(data), "test-method") + assert.Contains(t, string(data), "test-error") + assert.NotContains(t, string(data), "INFO") + + stderr.Seek(0, os.SEEK_SET) + stderr.Truncate(0) + }) + os.Stderr = old +} diff --git a/pkg/net/rpc/warden/server.go b/pkg/net/rpc/warden/server.go index 0caa86f5c..a187265dd 100644 --- a/pkg/net/rpc/warden/server.go +++ b/pkg/net/rpc/warden/server.go @@ -13,6 +13,7 @@ import ( "github.com/bilibili/kratos/pkg/conf/dsn" "github.com/bilibili/kratos/pkg/log" nmd "github.com/bilibili/kratos/pkg/net/metadata" + "github.com/bilibili/kratos/pkg/net/rpc/warden/ratelimiter" "github.com/bilibili/kratos/pkg/net/trace" xtime "github.com/bilibili/kratos/pkg/time" @@ -64,6 +65,9 @@ type ServerConfig struct { // KeepAliveTimeout is After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that // the connection is closed. KeepAliveTimeout xtime.Duration `dsn:"query.keepaliveTimeout"` + // LogFlag to control log behaviour. e.g. LogFlag: warden.LogFlagDisableLog. + // Disable: 1 DisableArgs: 2 DisableInfo: 4 + LogFlag int8 `dsn:"query.logFlag"` } // Server is the framework's server side instance, it contains the GrpcServer, interceptor and interceptors. @@ -135,7 +139,6 @@ func (s *Server) handle() grpc.UnaryServerInterceptor { func init() { addFlag(flag.CommandLine) - } func addFlag(fs *flag.FlagSet) { @@ -166,6 +169,8 @@ func NewServer(conf *ServerConfig, opt ...grpc.ServerOption) (s *Server) { fmt.Fprint(os.Stderr, "[warden] please call flag.Parse() before Init warden server, some configure may not effect\n") } conf = parseDSN(_grpcDSN) + } else { + fmt.Fprintf(os.Stderr, "[warden] config is Deprecated, argument will be ignored. please use -grpc flag or GRPC env to configure warden server.\n") } s = new(Server) if err := s.SetConfig(conf); err != nil { @@ -180,7 +185,8 @@ func NewServer(conf *ServerConfig, opt ...grpc.ServerOption) (s *Server) { }) opt = append(opt, keepParam, grpc.UnaryInterceptor(s.interceptor)) s.server = grpc.NewServer(opt...) - s.Use(s.recovery(), s.handle(), serverLogging(), s.stats(), s.validate()) + s.Use(s.recovery(), s.handle(), serverLogging(conf.LogFlag), s.stats(), s.validate()) + s.Use(ratelimiter.New(nil).Limit()) return } @@ -299,8 +305,6 @@ func (s *Server) Start() (*Server, error) { return nil, err } reflection.Register(s.server) - - log.Info("warden: start grpc listen addr: %s", s.conf.Addr) go func() { if err := s.Serve(lis); err != nil { panic(err)