criticality and client criticality with bm

pull/112/head
realityone 6 years ago
parent 457908d64b
commit b59ec79e5c
No known key found for this signature in database
GPG Key ID: ABD4DB50620408C3
  1. 57
      pkg/net/criticality/criticality.go
  2. 5
      pkg/net/http/blademaster/client.go
  3. 21
      pkg/net/http/blademaster/criticality.go
  4. 29
      pkg/net/http/blademaster/metadata.go
  5. 15
      pkg/net/http/blademaster/server.go
  6. 137
      pkg/net/http/blademaster/server_test.go
  7. 12
      pkg/net/metadata/key.go

@ -0,0 +1,57 @@
package criticality
// Criticality is
type Criticality string
// criticality
var (
// EmptyCriticality is used to mark any invalid criticality, and the empty criticality will be parsed as the default criticality later.
EmptyCriticality = Criticality("")
// CriticalPlus is reserved for the most critical requests, those that will result in serious user-visible impact if they fail.
CriticalPlus = Criticality("CRITICAL_PLUS")
// Critical is the default value for requests sent from production jobs. These requests will result in user-visible impact, but the impact may be less severe than those of CRITICAL_PLUS. Services are expected to provision enough capacity for all expected CRITICAL and CRITICAL_PLUS traffic.
Critical = Criticality("CRITICAL")
// SheddablePlus is traffic for which partial unavailability is expected. This is the default for batch jobs, which can retry requests minutes or even hours later.
SheddablePlus = Criticality("SHEDDABLE_PLUS")
// Sheddable is traffic for which frequent partial unavailability and occasional full unavailability is expected.
Sheddable = Criticality("SHEDDABLE")
// higher is more critical
_criticalityEnum = map[Criticality]int{
CriticalPlus: 40,
Critical: 30,
SheddablePlus: 20,
Sheddable: 10,
}
_defaultCriticality = Critical
)
// Value is used to get criticality value, higher value is more critical.
func Value(in Criticality) int {
v, ok := _criticalityEnum[in]
if !ok {
return _criticalityEnum[_defaultCriticality]
}
return v
}
// Higher will compare the input criticality with self, return true if the input is more critical than self.
func (c Criticality) Higher(in Criticality) bool {
return Value(in) > Value(c)
}
// Parse will parse raw criticality string as valid critality. Any invalid input will parse as empty criticality.
func Parse(raw string) Criticality {
crtl := Criticality(raw)
if _, ok := _criticalityEnum[crtl]; ok {
return crtl
}
return EmptyCriticality
}
// Exist is used to check criticality is exist in several enumeration.
func Exist(c Criticality) bool {
_, ok := _criticalityEnum[c]
return ok
}

@ -256,6 +256,11 @@ func (client *Client) Raw(c context.Context, req *xhttp.Request, v ...string) (b
if color := metadata.String(c, metadata.Color); color != "" {
setColor(req, color)
}
metadata.Range(c,
func(key string, value interface{}) {
setMetadata(req, key, value)
},
metadata.IsOutgoingKey)
if resp, err = client.client.Do(req); err != nil {
err = pkgerr.Wrapf(err, "host:%s, url:%s", req.URL.Host, realURL(req))
code = "failed"

@ -0,0 +1,21 @@
package blademaster
import (
criticalityPkg "github.com/bilibili/kratos/pkg/net/criticality"
"github.com/bilibili/kratos/pkg/net/metadata"
"github.com/pkg/errors"
)
// Criticality is
func Criticality(pathCriticality criticalityPkg.Criticality) HandlerFunc {
if !criticalityPkg.Exist(pathCriticality) {
panic(errors.Errorf("This criticality is not exist: %s", pathCriticality))
}
return func(ctx *Context) {
md, ok := metadata.FromContext(ctx)
if ok {
md[metadata.Criticality] = string(pathCriticality)
}
}
}

@ -1,6 +1,7 @@
package blademaster
import (
"fmt"
"net/http"
"strconv"
"strings"
@ -8,6 +9,8 @@ import (
"github.com/bilibili/kratos/pkg/conf/env"
"github.com/bilibili/kratos/pkg/log"
criticalityPkg "github.com/bilibili/kratos/pkg/net/criticality"
"github.com/bilibili/kratos/pkg/net/metadata"
"github.com/pkg/errors"
)
@ -20,8 +23,28 @@ const (
_httpHeaderMirror = "x1-bmspy-mirror"
_httpHeaderRemoteIP = "x-backend-bm-real-ip"
_httpHeaderRemoteIPPort = "x-backend-bm-real-ipport"
_httpHeaderCriticality = "x-backend-bili-criticality"
)
const (
_httpHeaderMetadata = "x-bili-metadata-"
)
var _outgoingHeader = map[string]string{
metadata.Color: _httpHeaderColor,
metadata.Criticality: _httpHeaderCriticality,
metadata.Mirror: _httpHeaderMirror,
}
func setMetadata(req *http.Request, key string, value interface{}) {
strV, ok := value.(string)
if !ok {
return
}
header := fmt.Sprintf("%s%s", _httpHeaderMetadata, strings.ReplaceAll(key, "_", "-"))
req.Header.Set(header, strV)
}
// mirror return true if x-bmspy-mirror in http header and its value is 1 or true.
func mirror(req *http.Request) bool {
mirrorStr := req.Header.Get(_httpHeaderMirror)
@ -79,6 +102,12 @@ func timeout(req *http.Request) time.Duration {
return time.Duration(timeout) * time.Millisecond
}
// criticality get criticality from http request.
func criticality(req *http.Request) criticalityPkg.Criticality {
raw := req.Header.Get(_httpHeaderCriticality)
return criticalityPkg.Parse(raw)
}
// remoteIP implements a best effort algorithm to return the real client IP, it parses
// x-backend-bm-real-ip or X-Real-IP or X-Forwarded-For in order to work properly with reverse-proxies such us: nginx or haproxy.
// Use X-Forwarded-For before X-Real-Ip as nginx uses X-Real-Ip with the proxy's IP.

@ -15,6 +15,7 @@ import (
"github.com/bilibili/kratos/pkg/conf/dsn"
"github.com/bilibili/kratos/pkg/log"
criticalityPkg "github.com/bilibili/kratos/pkg/net/criticality"
"github.com/bilibili/kratos/pkg/net/ip"
"github.com/bilibili/kratos/pkg/net/metadata"
"github.com/bilibili/kratos/pkg/stat"
@ -261,11 +262,15 @@ func (engine *Engine) handleContext(c *Context) {
tm = ctm
}
md := metadata.MD{
metadata.Color: color(req),
metadata.RemoteIP: remoteIP(req),
metadata.RemotePort: remotePort(req),
metadata.Caller: caller(req),
metadata.Mirror: mirror(req),
metadata.Color: color(req),
metadata.RemoteIP: remoteIP(req),
metadata.RemotePort: remotePort(req),
metadata.Caller: caller(req),
metadata.Mirror: mirror(req),
metadata.Criticality: string(criticalityPkg.Critical),
}
if crtl := criticality(req); crtl != criticalityPkg.EmptyCriticality {
md[metadata.Criticality] = string(crtl)
}
ctx := metadata.NewContext(context.Background(), md)
if tm > 0 {

@ -0,0 +1,137 @@
package blademaster
import (
"context"
"fmt"
"io/ioutil"
"net/http"
"sync"
"sync/atomic"
"testing"
"time"
criticalityPkg "github.com/bilibili/kratos/pkg/net/criticality"
"github.com/bilibili/kratos/pkg/net/metadata"
xtime "github.com/bilibili/kratos/pkg/time"
"github.com/stretchr/testify/assert"
)
var (
sonce sync.Once
SockAddr = "localhost:18080"
curEngine atomic.Value
)
func uri(base, path string) string {
return fmt.Sprintf("%s://%s%s", "http", base, path)
}
func shutdown() {
if err := curEngine.Load().(*Engine).Shutdown(context.TODO()); err != nil {
panic(err)
}
}
func setupHandler(engine *Engine) {
// set the global timeout is 2 second
engine.conf.Timeout = xtime.Duration(time.Second * 2)
engine.Ping(func(ctx *Context) {
ctx.AbortWithStatus(200)
})
engine.GET("/criticality/api", Criticality(criticalityPkg.Critical), func(ctx *Context) {
ctx.String(200, "%s", metadata.String(ctx, metadata.Criticality))
})
engine.GET("/criticality/none/api", func(ctx *Context) {
ctx.String(200, "%s", metadata.String(ctx, metadata.Criticality))
})
}
func startServer() {
e := DefaultServer(nil)
setupHandler(e)
go e.Run(SockAddr)
curEngine.Store(e)
time.Sleep(time.Second)
}
func TestCriticality(t *testing.T) {
startServer()
defer shutdown()
tests := []*struct {
path string
crtl criticalityPkg.Criticality
expected criticalityPkg.Criticality
}{
{
"/criticality/api",
criticalityPkg.EmptyCriticality,
criticalityPkg.Critical,
},
{
"/criticality/api",
criticalityPkg.CriticalPlus,
criticalityPkg.Critical,
},
{
"/criticality/api",
criticalityPkg.SheddablePlus,
criticalityPkg.Critical,
},
}
client := &http.Client{}
for _, testCase := range tests {
req, err := http.NewRequest("GET", uri(SockAddr, testCase.path), nil)
assert.NoError(t, err)
req.Header.Set(_httpHeaderCriticality, string(testCase.crtl))
resp, err := client.Do(req)
assert.NoError(t, err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, testCase.expected, criticalityPkg.Criticality(body))
}
}
func TestNoneCriticality(t *testing.T) {
startServer()
defer shutdown()
tests := []*struct {
path string
crtl criticalityPkg.Criticality
expected criticalityPkg.Criticality
}{
{
"/criticality/none/api",
criticalityPkg.EmptyCriticality,
criticalityPkg.Critical,
},
{
"/criticality/none/api",
criticalityPkg.CriticalPlus,
criticalityPkg.CriticalPlus,
},
{
"/criticality/none/api",
criticalityPkg.SheddablePlus,
criticalityPkg.SheddablePlus,
},
}
client := &http.Client{}
for _, testCase := range tests {
req, err := http.NewRequest("GET", uri(SockAddr, testCase.path), nil)
assert.NoError(t, err)
req.Header.Set(_httpHeaderCriticality, string(testCase.crtl))
resp, err := client.Do(req)
assert.NoError(t, err)
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
assert.NoError(t, err)
assert.Equal(t, testCase.expected, criticalityPkg.Criticality(body))
}
}

@ -36,13 +36,17 @@ const (
// Device 客户端信息
Device = "device"
// Criticality 重要性
Criticality = "criticality"
)
var outgoingKey = map[string]struct{}{
Color: struct{}{},
RemoteIP: struct{}{},
RemotePort: struct{}{},
Mirror: struct{}{},
Color: struct{}{},
RemoteIP: struct{}{},
RemotePort: struct{}{},
Mirror: struct{}{},
Criticality: struct{}{},
}
var incomingKey = map[string]struct{}{

Loading…
Cancel
Save