support some feature and fix some bug (#295)

1. bm perf default use engine port
2. add grpc gzip import
3. fix discovery client register addrs bug
pull/298/head
Felix Hao 5 years ago committed by GitHub
parent 2c6eb487a3
commit af3aaf073c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      pkg/cache/metrics.go
  2. 8
      pkg/cache/redis/pool.go
  3. 4
      pkg/naming/discovery/discovery.go
  4. 37
      pkg/net/http/blademaster/perf.go
  5. 2
      pkg/net/http/blademaster/server.go
  6. 1
      pkg/net/rpc/warden/server.go

@ -4,6 +4,7 @@ import "github.com/bilibili/kratos/pkg/stat/metric"
const _metricNamespace = "cache" const _metricNamespace = "cache"
// be used in tool/kratos-gen-bts
var ( var (
MetricHits = metric.NewCounterVec(&metric.CounterVecOpts{ MetricHits = metric.NewCounterVec(&metric.CounterVecOpts{
Namespace: _metricNamespace, Namespace: _metricNamespace,

@ -193,7 +193,9 @@ func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply i
ci := LookupCommandInfo(commandName) ci := LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear pc.state = (pc.state | ci.Set) &^ ci.Clear
reply, err = pc.c.Do(commandName, args...) reply, err = pc.c.Do(commandName, args...)
pc.p.statfunc(pc.p.c.Name, pc.p.c.Addr, commandName, now, err)() if pc.p.statfunc != nil {
pc.p.statfunc(pc.p.c.Name, pc.p.c.Addr, commandName, now, err)()
}
return return
} }
@ -217,7 +219,9 @@ func (pc *pooledConnection) Receive() (reply interface{}, err error) {
if len(pc.cmds) > 0 { if len(pc.cmds) > 0 {
cmd := pc.cmds[0] cmd := pc.cmds[0]
pc.cmds = pc.cmds[1:] pc.cmds = pc.cmds[1:]
pc.p.statfunc(pc.p.c.Name, pc.p.c.Addr, cmd, pc.now, err)() if pc.p.statfunc != nil {
pc.p.statfunc(pc.p.c.Name, pc.p.c.Addr, cmd, pc.now, err)()
}
} }
return return
} }

@ -370,7 +370,9 @@ func (d *Discovery) register(ctx context.Context, ins *naming.Instance) (err err
uri := fmt.Sprintf(_registerURL, d.pickNode()) uri := fmt.Sprintf(_registerURL, d.pickNode())
params := d.newParams(c) params := d.newParams(c)
params.Set("appid", ins.AppID) params.Set("appid", ins.AppID)
params.Set("addrs", strings.Join(ins.Addrs, ",")) for _, addr := range ins.Addrs {
params.Add("addrs", addr)
}
params.Set("version", ins.Version) params.Set("version", ins.Version)
params.Set("status", _statusUP) params.Set("status", _statusUP)
params.Set("metadata", string(metadata)) params.Set("metadata", string(metadata))

@ -19,28 +19,45 @@ var (
func init() { func init() {
v := os.Getenv("HTTP_PERF") v := os.Getenv("HTTP_PERF")
if v == "" {
v = "tcp://0.0.0.0:2333"
}
flag.StringVar(&_perfDSN, "http.perf", v, "listen http perf dsn, or use HTTP_PERF env variable.") flag.StringVar(&_perfDSN, "http.perf", v, "listen http perf dsn, or use HTTP_PERF env variable.")
} }
func startPerf() { func startPerf(engine *Engine) {
_perfOnce.Do(func() { _perfOnce.Do(func() {
mux := http.NewServeMux() if os.Getenv("HTTP_PERF") == "" {
mux.HandleFunc("/debug/pprof/", pprof.Index) prefixRouter := engine.Group("/debug/pprof")
mux.HandleFunc("/debug/pprof/cmdline", pprof.Cmdline) {
mux.HandleFunc("/debug/pprof/profile", pprof.Profile) prefixRouter.GET("/", pprofHandler(pprof.Index))
mux.HandleFunc("/debug/pprof/symbol", pprof.Symbol) prefixRouter.GET("/cmdline", pprofHandler(pprof.Cmdline))
prefixRouter.GET("/profile", pprofHandler(pprof.Profile))
prefixRouter.POST("/symbol", pprofHandler(pprof.Symbol))
prefixRouter.GET("/symbol", pprofHandler(pprof.Symbol))
prefixRouter.GET("/trace", pprofHandler(pprof.Trace))
prefixRouter.GET("/allocs", pprofHandler(pprof.Handler("allocs").ServeHTTP))
prefixRouter.GET("/block", pprofHandler(pprof.Handler("block").ServeHTTP))
prefixRouter.GET("/goroutine", pprofHandler(pprof.Handler("goroutine").ServeHTTP))
prefixRouter.GET("/heap", pprofHandler(pprof.Handler("heap").ServeHTTP))
prefixRouter.GET("/mutex", pprofHandler(pprof.Handler("mutex").ServeHTTP))
prefixRouter.GET("/threadcreate", pprofHandler(pprof.Handler("threadcreate").ServeHTTP))
}
return
}
go func() { go func() {
d, err := dsn.Parse(_perfDSN) d, err := dsn.Parse(_perfDSN)
if err != nil { if err != nil {
panic(errors.Errorf("blademaster: http perf dsn must be tcp://$host:port, %s:error(%v)", _perfDSN, err)) panic(errors.Errorf("blademaster: http perf dsn must be tcp://$host:port, %s:error(%v)", _perfDSN, err))
} }
if err := http.ListenAndServe(d.Host, mux); err != nil { if err := http.ListenAndServe(d.Host, nil); err != nil {
panic(errors.Errorf("blademaster: listen %s: error(%v)", d.Host, err)) panic(errors.Errorf("blademaster: listen %s: error(%v)", d.Host, err))
} }
}() }()
}) })
} }
func pprofHandler(h http.HandlerFunc) HandlerFunc {
handler := http.HandlerFunc(h)
return func(c *Context) {
handler.ServeHTTP(c.Writer, c.Request)
}
}

@ -195,7 +195,7 @@ func NewServer(conf *ServerConfig) *Engine {
c.Bytes(405, "text/plain", []byte(http.StatusText(405))) c.Bytes(405, "text/plain", []byte(http.StatusText(405)))
c.Abort() c.Abort()
}) })
startPerf() startPerf(engine)
return engine return engine
} }

@ -23,6 +23,7 @@ import (
"github.com/pkg/errors" "github.com/pkg/errors"
"google.golang.org/grpc" "google.golang.org/grpc"
_ "google.golang.org/grpc/encoding/gzip" // NOTE: use grpc gzip by header grpc-accept-encoding
"google.golang.org/grpc/keepalive" "google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer" "google.golang.org/grpc/peer"

Loading…
Cancel
Save