From 0a076443cba110af07f4592eebf909905459c828 Mon Sep 17 00:00:00 2001 From: YuanXin Hu Date: Mon, 13 Feb 2023 11:38:22 +0800 Subject: [PATCH] feat: support polaris ratelimit ability (#2586) --- contrib/polaris/go.mod | 2 +- contrib/polaris/limiter.go | 105 +++++++++++++++++++++++++++++++++++ contrib/polaris/polaris.go | 13 +++++ contrib/polaris/ratelimit.go | 53 ++++++++++++++++++ 4 files changed, 172 insertions(+), 1 deletion(-) create mode 100644 contrib/polaris/limiter.go create mode 100644 contrib/polaris/ratelimit.go diff --git a/contrib/polaris/go.mod b/contrib/polaris/go.mod index 168ea5c38..3f2e6be78 100644 --- a/contrib/polaris/go.mod +++ b/contrib/polaris/go.mod @@ -3,6 +3,7 @@ module github.com/go-kratos/kratos/contrib/polaris/v2 go 1.18 require ( + github.com/go-kratos/aegis v0.1.4 github.com/go-kratos/kratos/v2 v2.5.3 github.com/google/uuid v1.3.0 github.com/polarismesh/polaris-go v1.3.0 @@ -13,7 +14,6 @@ require ( github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/dlclark/regexp2 v1.7.0 // indirect - github.com/go-kratos/aegis v0.1.4 // indirect github.com/go-playground/form/v4 v4.2.0 // indirect github.com/golang/protobuf v1.5.2 // indirect github.com/gorilla/mux v1.8.0 // indirect diff --git a/contrib/polaris/limiter.go b/contrib/polaris/limiter.go new file mode 100644 index 000000000..9f74beb5e --- /dev/null +++ b/contrib/polaris/limiter.go @@ -0,0 +1,105 @@ +package polaris + +import ( + "time" + + "github.com/go-kratos/aegis/ratelimit" + + "github.com/polarismesh/polaris-go" + "github.com/polarismesh/polaris-go/pkg/model" +) + +type ( + // LimiterOption function for polaris limiter + LimiterOption func(*limiterOptions) +) + +type limiterOptions struct { + // required, polaris limit namespace + namespace string + + // required, polaris limit service name + service string + + // optional, polaris limit request timeout + // max value is (1+RetryCount) * Timeout + timeout time.Duration + + // optional, polaris limit retryCount + // init by polaris config + retryCount int + + // optional, request limit quota + token uint32 +} + +// WithLimiterNamespace with limiter namespace. +func WithLimiterNamespace(namespace string) LimiterOption { + return func(o *limiterOptions) { + o.namespace = namespace + } +} + +// WithLimiterService with limiter service. +func WithLimiterService(service string) LimiterOption { + return func(o *limiterOptions) { + o.service = service + } +} + +// WithLimiterTimeout with limiter arguments. +func WithLimiterTimeout(timeout time.Duration) LimiterOption { + return func(o *limiterOptions) { + o.timeout = timeout + } +} + +// WithLimiterRetryCount with limiter retryCount. +func WithLimiterRetryCount(retryCount int) LimiterOption { + return func(o *limiterOptions) { + o.retryCount = retryCount + } +} + +// WithLimiterToken with limiter token. +func WithLimiterToken(token uint32) LimiterOption { + return func(o *limiterOptions) { + o.token = token + } +} + +type Limiter struct { + // polaris limit api + limitAPI polaris.LimitAPI + + opts limiterOptions +} + +// init quotaRequest +func buildRequest(opts limiterOptions) polaris.QuotaRequest { + quotaRequest := polaris.NewQuotaRequest() + quotaRequest.SetNamespace(opts.namespace) + quotaRequest.SetRetryCount(opts.retryCount) + quotaRequest.SetService(opts.service) + quotaRequest.SetTimeout(opts.timeout) + quotaRequest.SetToken(opts.token) + return quotaRequest +} + +// Allow interface impl +func (l *Limiter) Allow(method string, argument ...model.Argument) (ratelimit.DoneFunc, error) { + request := buildRequest(l.opts) + request.SetMethod(method) + for _, arg := range argument { + request.AddArgument(arg) + } + resp, err := l.limitAPI.GetQuota(request) + if err != nil { + // ignore err + return func(ratelimit.DoneInfo) {}, nil + } + if resp.Get().Code == model.QuotaResultOk { + return func(ratelimit.DoneInfo) {}, nil + } + return nil, ratelimit.ErrLimitExceed +} diff --git a/contrib/polaris/polaris.go b/contrib/polaris/polaris.go index ec9476539..ec34e9b72 100644 --- a/contrib/polaris/polaris.go +++ b/contrib/polaris/polaris.go @@ -77,3 +77,16 @@ func (p *Polaris) Registry(opts ...RegistryOption) (r *Registry) { consumer: p.discovery, } } + +func (p *Polaris) Limiter(opts ...LimiterOption) (r *Limiter) { + op := limiterOptions{ + namespace: p.namespace, + } + for _, option := range opts { + option(&op) + } + return &Limiter{ + limitAPI: p.limit, + opts: op, + } +} diff --git a/contrib/polaris/ratelimit.go b/contrib/polaris/ratelimit.go new file mode 100644 index 000000000..02e0c4954 --- /dev/null +++ b/contrib/polaris/ratelimit.go @@ -0,0 +1,53 @@ +package polaris + +import ( + "context" + "strings" + + "github.com/go-kratos/aegis/ratelimit" + + "github.com/go-kratos/kratos/v2/errors" + "github.com/go-kratos/kratos/v2/middleware" + "github.com/go-kratos/kratos/v2/transport" + "github.com/go-kratos/kratos/v2/transport/http" + + "github.com/polarismesh/polaris-go/pkg/model" +) + +// ErrLimitExceed is service unavailable due to rate limit exceeded. +var ( + ErrLimitExceed = errors.New(429, "RATELIMIT", "service unavailable due to rate limit exceeded") +) + +// Ratelimit ratelimiter middleware +func Ratelimit(l Limiter) middleware.Middleware { + return func(handler middleware.Handler) middleware.Handler { + return func(ctx context.Context, req interface{}) (reply interface{}, err error) { + if tr, ok := transport.FromServerContext(ctx); ok { + var args []model.Argument + headers := tr.RequestHeader() + // handle header + for _, header := range headers.Keys() { + args = append(args, model.BuildHeaderArgument(header, headers.Get(header))) + } + // handle http + if ht, ok := tr.(*http.Transport); ok { + // url query + for key, values := range ht.Request().URL.Query() { + args = append(args, model.BuildQueryArgument(key, strings.Join(values, ","))) + } + } + done, e := l.Allow(tr.Operation(), args...) + if e != nil { + // rejected + return nil, ErrLimitExceed + } + // allowed + reply, err = handler(ctx, req) + done(ratelimit.DoneInfo{Err: err}) + return + } + return nil, errors.New(400, "Error with transport.FromServerContext", "Error with transport.FromServerContext") + } + } +}