From 27eadd83b4e23ed03e3f053bf23e7426a092eb51 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8C=85=E5=AD=90?= Date: Mon, 20 Feb 2023 22:56:48 +0800 Subject: [PATCH] fix: polaris get service name (#2615) --- contrib/polaris/polaris.go | 13 ++++- contrib/polaris/ratelimit.go | 4 +- contrib/polaris/router.go | 110 ++++++++++++++++++++--------------- 3 files changed, 77 insertions(+), 50 deletions(-) diff --git a/contrib/polaris/polaris.go b/contrib/polaris/polaris.go index ec34e9b72..15da49f16 100644 --- a/contrib/polaris/polaris.go +++ b/contrib/polaris/polaris.go @@ -16,6 +16,7 @@ type Polaris struct { registry polaris.ProviderAPI discovery polaris.ConsumerAPI namespace string + service string } // Option is polaris option. @@ -28,6 +29,13 @@ func WithNamespace(ns string) Option { } } +// WithService set the current service name +func WithService(service string) Option { + return func(o *Polaris) { + o.service = service + } +} + // New polaris Service governance. func New(sdk api.SDKContext, opts ...Option) Polaris { op := Polaris{ @@ -78,14 +86,15 @@ func (p *Polaris) Registry(opts ...RegistryOption) (r *Registry) { } } -func (p *Polaris) Limiter(opts ...LimiterOption) (r *Limiter) { +func (p *Polaris) Limiter(opts ...LimiterOption) (r Limiter) { op := limiterOptions{ namespace: p.namespace, + service: p.service, } for _, option := range opts { option(&op) } - return &Limiter{ + return Limiter{ limitAPI: p.limit, opts: op, } diff --git a/contrib/polaris/ratelimit.go b/contrib/polaris/ratelimit.go index 02e0c4954..f59938e9a 100644 --- a/contrib/polaris/ratelimit.go +++ b/contrib/polaris/ratelimit.go @@ -19,7 +19,7 @@ var ( ErrLimitExceed = errors.New(429, "RATELIMIT", "service unavailable due to rate limit exceeded") ) -// Ratelimit ratelimiter middleware +// Ratelimit Request rate limit middleware func Ratelimit(l Limiter) middleware.Middleware { return func(handler middleware.Handler) middleware.Handler { return func(ctx context.Context, req interface{}) (reply interface{}, err error) { @@ -47,7 +47,7 @@ func Ratelimit(l Limiter) middleware.Middleware { done(ratelimit.DoneInfo{Err: err}) return } - return nil, errors.New(400, "Error with transport.FromServerContext", "Error with transport.FromServerContext") + return reply, nil } } } diff --git a/contrib/polaris/router.go b/contrib/polaris/router.go index de2d714f2..958db4ab4 100644 --- a/contrib/polaris/router.go +++ b/contrib/polaris/router.go @@ -21,71 +21,89 @@ import ( "github.com/go-kratos/kratos/v2/transport/http" ) +type router struct { + service string +} + +type RouterOption func(o *router) + +// WithRouterService set the caller service name used by the route +func WithRouterService(service string) RouterOption { + return func(o *router) { + o.service = service + } +} + // NodeFilter polaris dynamic router selector -func (p *Polaris) NodeFilter() selector.NodeFilter { +func (p *Polaris) NodeFilter(opts ...RouterOption) selector.NodeFilter { + o := router{service: p.service} + for _, opt := range opts { + opt(&o) + } return func(ctx context.Context, nodes []selector.Node) []selector.Node { if len(nodes) == 0 { return nodes } + req := &polaris.ProcessRoutersRequest{ + ProcessRoutersRequest: model.ProcessRoutersRequest{ + SourceService: model.ServiceInfo{Namespace: p.namespace, Service: o.service}, + DstInstances: buildPolarisInstance(p.namespace, nodes), + }, + } if appInfo, ok := kratos.FromContext(ctx); ok { - req := &polaris.ProcessRoutersRequest{ - ProcessRoutersRequest: model.ProcessRoutersRequest{ - SourceService: model.ServiceInfo{ - Service: appInfo.Name(), - Namespace: p.namespace, - }, - DstInstances: buildPolarisInstance(p.namespace, nodes), - }, - } - req.AddArguments(model.BuildCallerServiceArgument(p.namespace, appInfo.Name())) + req.SourceService.Service = appInfo.Name() + } - // process transport - if tr, ok := transport.FromServerContext(ctx); ok { - req.AddArguments(model.BuildMethodArgument(tr.Operation())) - req.AddArguments(model.BuildPathArgument(tr.Operation())) + req.AddArguments(model.BuildCallerServiceArgument(p.namespace, req.ProcessRoutersRequest.SourceService.Service)) - for _, key := range tr.RequestHeader().Keys() { - req.AddArguments(model.BuildHeaderArgument(key, tr.RequestHeader().Get(key))) - } + // process transport + if tr, ok := transport.FromClientContext(ctx); ok { + req.AddArguments(model.BuildMethodArgument(tr.Operation())) + req.AddArguments(model.BuildPathArgument(tr.Operation())) + + for _, key := range tr.RequestHeader().Keys() { + req.AddArguments(model.BuildHeaderArgument(strings.ToLower(key), tr.RequestHeader().Get(key))) + } - // http - if ht, ok := tr.(http.Transporter); ok { - req.AddArguments(model.BuildPathArgument(ht.Request().URL.Path)) - req.AddArguments(model.BuildCallerIPArgument(ht.Request().RemoteAddr)) + // http + if ht, ok := tr.(http.Transporter); ok { + req.AddArguments(model.BuildPathArgument(ht.Request().URL.Path)) + req.AddArguments(model.BuildCallerIPArgument(ht.Request().RemoteAddr)) - // cookie - for _, cookie := range ht.Request().Cookies() { - req.AddArguments(model.BuildCookieArgument(cookie.Name, cookie.Value)) - } + // cookie + for _, cookie := range ht.Request().Cookies() { + req.AddArguments(model.BuildCookieArgument(cookie.Name, cookie.Value)) + } - // url query - for key, values := range ht.Request().URL.Query() { - req.AddArguments(model.BuildQueryArgument(key, strings.Join(values, ","))) - } + // url query + for key, values := range ht.Request().URL.Query() { + req.AddArguments(model.BuildQueryArgument(key, strings.Join(values, ","))) } } + } - n := make(map[string]selector.Node, len(nodes)) + n := make(map[string]selector.Node, len(nodes)) - for _, node := range nodes { - n[node.Address()] = node - } + for _, node := range nodes { + n[node.Address()] = node + } - m, err := p.router.ProcessRouters(req) - if err != nil { - log.Errorf("polaris process routers failed, err=%v", err) - return nodes - } + m, err := p.router.ProcessRouters(req) + if err != nil { + log.Errorf("polaris process routers failed, err=%v", err) + return nodes + } - newNode := make([]selector.Node, 0, len(m.Instances)) - for _, ins := range m.GetInstances() { - if v, ok := n[fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())]; ok { - newNode = append(newNode, v) - } + newNode := make([]selector.Node, 0, len(m.Instances)) + for _, ins := range m.GetInstances() { + if v, ok := n[fmt.Sprintf("%s:%d", ins.GetHost(), ins.GetPort())]; ok { + newNode = append(newNode, v) } - return newNode } - return nodes + if len(newNode) == 0 { + return nodes + } + return newNode } }