package warden import ( "context" "flag" "fmt" "math" "net" "os" "sync" "time" "github.com/bilibili/kratos/pkg/conf/dsn" "github.com/bilibili/kratos/pkg/log" nmd "github.com/bilibili/kratos/pkg/net/metadata" "github.com/bilibili/kratos/pkg/net/rpc/warden/ratelimiter" "github.com/bilibili/kratos/pkg/net/trace" xtime "github.com/bilibili/kratos/pkg/time" //this package is for json format response _ "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/encoding/json" "github.com/bilibili/kratos/pkg/net/rpc/warden/internal/status" "github.com/pkg/errors" "google.golang.org/grpc" _ "google.golang.org/grpc/encoding/gzip" // NOTE: use grpc gzip by header grpc-accept-encoding "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/reflection" ) var ( _grpcDSN string _defaultSerConf = &ServerConfig{ Network: "tcp", Addr: "0.0.0.0:9000", Timeout: xtime.Duration(time.Second), IdleTimeout: xtime.Duration(time.Second * 60), MaxLifeTime: xtime.Duration(time.Hour * 2), ForceCloseWait: xtime.Duration(time.Second * 20), KeepAliveInterval: xtime.Duration(time.Second * 60), KeepAliveTimeout: xtime.Duration(time.Second * 20), } _abortIndex int8 = math.MaxInt8 / 2 ) // ServerConfig is rpc server conf. type ServerConfig struct { // Network is grpc listen network,default value is tcp Network string `dsn:"network"` // Addr is grpc listen addr,default value is 0.0.0.0:9000 Addr string `dsn:"address"` // Timeout is context timeout for per rpc call. Timeout xtime.Duration `dsn:"query.timeout"` // IdleTimeout is a duration for the amount of time after which an idle connection would be closed by sending a GoAway. // Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment. IdleTimeout xtime.Duration `dsn:"query.idleTimeout"` // MaxLifeTime is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway. // A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms. MaxLifeTime xtime.Duration `dsn:"query.maxLife"` // ForceCloseWait is an additive period after MaxLifeTime after which the connection will be forcibly closed. ForceCloseWait xtime.Duration `dsn:"query.closeWait"` // KeepAliveInterval is after a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive. KeepAliveInterval xtime.Duration `dsn:"query.keepaliveInterval"` // KeepAliveTimeout is After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that // the connection is closed. KeepAliveTimeout xtime.Duration `dsn:"query.keepaliveTimeout"` // LogFlag to control log behaviour. e.g. LogFlag: warden.LogFlagDisableLog. // Disable: 1 DisableArgs: 2 DisableInfo: 4 LogFlag int8 `dsn:"query.logFlag"` } // Server is the framework's server side instance, it contains the GrpcServer, interceptor and interceptors. // Create an instance of Server, by using NewServer(). type Server struct { conf *ServerConfig mutex sync.RWMutex server *grpc.Server handlers []grpc.UnaryServerInterceptor } // handle return a new unary server interceptor for OpenTracing\Logging\LinkTimeout. func (s *Server) handle() grpc.UnaryServerInterceptor { return func(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { var ( cancel func() addr string ) s.mutex.RLock() conf := s.conf s.mutex.RUnlock() // get derived timeout from grpc context, // compare with the warden configured, // and use the minimum one timeout := time.Duration(conf.Timeout) if dl, ok := ctx.Deadline(); ok { ctimeout := time.Until(dl) if ctimeout-time.Millisecond*20 > 0 { ctimeout = ctimeout - time.Millisecond*20 } if timeout > ctimeout { timeout = ctimeout } } ctx, cancel = context.WithTimeout(ctx, timeout) defer cancel() // get grpc metadata(trace & remote_ip & color) var t trace.Trace cmd := nmd.MD{} if gmd, ok := metadata.FromIncomingContext(ctx); ok { t, _ = trace.Extract(trace.GRPCFormat, gmd) for key, vals := range gmd { if nmd.IsIncomingKey(key) { cmd[key] = vals[0] } } } if t == nil { t = trace.New(args.FullMethod) } else { t.SetTitle(args.FullMethod) } if pr, ok := peer.FromContext(ctx); ok { addr = pr.Addr.String() t.SetTag(trace.String(trace.TagAddress, addr)) } defer t.Finish(&err) // use common meta data context instead of grpc context ctx = nmd.NewContext(ctx, cmd) ctx = trace.NewContext(ctx, t) resp, err = handler(ctx, req) return resp, status.FromError(err).Err() } } func init() { addFlag(flag.CommandLine) } func addFlag(fs *flag.FlagSet) { v := os.Getenv("GRPC") if v == "" { v = "tcp://0.0.0.0:9000/?timeout=1s&idle_timeout=60s" } fs.StringVar(&_grpcDSN, "grpc", v, "listen grpc dsn, or use GRPC env variable.") fs.Var(&_grpcTarget, "grpc.target", "usage: -grpc.target=seq.service=127.0.0.1:9000 -grpc.target=fav.service=192.168.10.1:9000") } func parseDSN(rawdsn string) *ServerConfig { conf := new(ServerConfig) d, err := dsn.Parse(rawdsn) if err != nil { panic(errors.WithMessage(err, fmt.Sprintf("warden: invalid dsn: %s", rawdsn))) } if _, err = d.Bind(conf); err != nil { panic(errors.WithMessage(err, fmt.Sprintf("warden: invalid dsn: %s", rawdsn))) } return conf } // NewServer returns a new blank Server instance with a default server interceptor. func NewServer(conf *ServerConfig, opt ...grpc.ServerOption) (s *Server) { if conf == nil { if !flag.Parsed() { fmt.Fprint(os.Stderr, "[warden] please call flag.Parse() before Init warden server, some configure may not effect\n") } conf = parseDSN(_grpcDSN) } else { fmt.Fprintf(os.Stderr, "[warden] config is Deprecated, argument will be ignored. please use -grpc flag or GRPC env to configure warden server.\n") } s = new(Server) if err := s.SetConfig(conf); err != nil { panic(errors.Errorf("warden: set config failed!err: %s", err.Error())) } keepParam := grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionIdle: time.Duration(s.conf.IdleTimeout), MaxConnectionAgeGrace: time.Duration(s.conf.ForceCloseWait), Time: time.Duration(s.conf.KeepAliveInterval), Timeout: time.Duration(s.conf.KeepAliveTimeout), MaxConnectionAge: time.Duration(s.conf.MaxLifeTime), }) opt = append(opt, keepParam, grpc.UnaryInterceptor(s.interceptor)) s.server = grpc.NewServer(opt...) s.Use(s.recovery(), s.handle(), serverLogging(conf.LogFlag), s.stats(), s.validate()) s.Use(ratelimiter.New(nil).Limit()) return } // SetConfig hot reloads server config func (s *Server) SetConfig(conf *ServerConfig) (err error) { if conf == nil { conf = _defaultSerConf } if conf.Timeout <= 0 { conf.Timeout = xtime.Duration(time.Second) } if conf.IdleTimeout <= 0 { conf.IdleTimeout = xtime.Duration(time.Second * 60) } if conf.MaxLifeTime <= 0 { conf.MaxLifeTime = xtime.Duration(time.Hour * 2) } if conf.ForceCloseWait <= 0 { conf.ForceCloseWait = xtime.Duration(time.Second * 20) } if conf.KeepAliveInterval <= 0 { conf.KeepAliveInterval = xtime.Duration(time.Second * 60) } if conf.KeepAliveTimeout <= 0 { conf.KeepAliveTimeout = xtime.Duration(time.Second * 20) } if conf.Addr == "" { conf.Addr = "0.0.0.0:9000" } if conf.Network == "" { conf.Network = "tcp" } s.mutex.Lock() s.conf = conf s.mutex.Unlock() return nil } // interceptor is a single interceptor out of a chain of many interceptors. // Execution is done in left-to-right order, including passing of context. // For example ChainUnaryServer(one, two, three) will execute one before two before three, and three // will see context changes of one and two. func (s *Server) interceptor(ctx context.Context, req interface{}, args *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { var ( i int chain grpc.UnaryHandler ) n := len(s.handlers) if n == 0 { return handler(ctx, req) } chain = func(ic context.Context, ir interface{}) (interface{}, error) { if i == n-1 { return handler(ic, ir) } i++ return s.handlers[i](ic, ir, args, chain) } return s.handlers[0](ctx, req, args, chain) } // Server return the grpc server for registering service. func (s *Server) Server() *grpc.Server { return s.server } // Use attachs a global inteceptor to the server. // For example, this is the right place for a rate limiter or error management inteceptor. func (s *Server) Use(handlers ...grpc.UnaryServerInterceptor) *Server { finalSize := len(s.handlers) + len(handlers) if finalSize >= int(_abortIndex) { panic("warden: server use too many handlers") } mergedHandlers := make([]grpc.UnaryServerInterceptor, finalSize) copy(mergedHandlers, s.handlers) copy(mergedHandlers[len(s.handlers):], handlers) s.handlers = mergedHandlers return s } // Run create a tcp listener and start goroutine for serving each incoming request. // Run will return a non-nil error unless Stop or GracefulStop is called. func (s *Server) Run(addr string) error { lis, err := net.Listen("tcp", addr) if err != nil { err = errors.WithStack(err) log.Error("failed to listen: %v", err) return err } reflection.Register(s.server) return s.Serve(lis) } // RunUnix create a unix listener and start goroutine for serving each incoming request. // RunUnix will return a non-nil error unless Stop or GracefulStop is called. func (s *Server) RunUnix(file string) error { lis, err := net.Listen("unix", file) if err != nil { err = errors.WithStack(err) log.Error("failed to listen: %v", err) return err } reflection.Register(s.server) return s.Serve(lis) } // Start create a new goroutine run server with configured listen addr // will panic if any error happend // return server itself func (s *Server) Start() (*Server, error) { lis, err := net.Listen(s.conf.Network, s.conf.Addr) if err != nil { return nil, err } log.Info("warden: start grpc listen addr: %v", lis.Addr()) reflection.Register(s.server) go func() { if err := s.Serve(lis); err != nil { panic(err) } }() return s, nil } // Serve accepts incoming connections on the listener lis, creating a new // ServerTransport and service goroutine for each. // Serve will return a non-nil error unless Stop or GracefulStop is called. func (s *Server) Serve(lis net.Listener) error { return s.server.Serve(lis) } // Shutdown stops the server gracefully. It stops the server from // accepting new connections and RPCs and blocks until all the pending RPCs are // finished or the context deadline is reached. func (s *Server) Shutdown(ctx context.Context) (err error) { ch := make(chan struct{}) go func() { s.server.GracefulStop() close(ch) }() select { case <-ctx.Done(): s.server.Stop() err = ctx.Err() case <-ch: } return }