From feeec630d7591aa5c4a64d3e4b7122f081e763b7 Mon Sep 17 00:00:00 2001 From: longxboy Date: Thu, 23 Sep 2021 17:42:21 +0800 Subject: [PATCH] feat: add selector and filters examples (#1485) * add selector examples and filters --- examples/selector/client/main.go | 82 +++++++++++++++++++++++ examples/selector/server/main.go | 79 ++++++++++++++++++++++ selector/default.go | 5 ++ selector/p2c/p2c.go | 22 +++++- selector/random/random.go | 31 +++++++-- selector/wrr/wrr.go | 82 +++++++++++++++++++++++ transport/grpc/{balancer => }/balancer.go | 14 +++- transport/grpc/client.go | 45 +++++++++---- transport/grpc/client_test.go | 2 +- transport/grpc/transport.go | 7 ++ transport/http/client.go | 4 +- 11 files changed, 348 insertions(+), 25 deletions(-) create mode 100644 examples/selector/client/main.go create mode 100644 examples/selector/server/main.go create mode 100644 selector/wrr/wrr.go rename transport/grpc/{balancer => }/balancer.go (85%) diff --git a/examples/selector/client/main.go b/examples/selector/client/main.go new file mode 100644 index 000000000..3641c6325 --- /dev/null +++ b/examples/selector/client/main.go @@ -0,0 +1,82 @@ +package main + +import ( + "context" + "log" + "time" + + "github.com/go-kratos/kratos/contrib/registry/consul/v2" + "github.com/go-kratos/kratos/examples/helloworld/helloworld" + "github.com/go-kratos/kratos/v2/middleware/recovery" + "github.com/go-kratos/kratos/v2/selector/filter" + "github.com/go-kratos/kratos/v2/selector/p2c" + "github.com/go-kratos/kratos/v2/selector/wrr" + "github.com/go-kratos/kratos/v2/transport/grpc" + "github.com/go-kratos/kratos/v2/transport/http" + "github.com/hashicorp/consul/api" +) + +func main() { + consulCli, err := api.NewClient(api.DefaultConfig()) + if err != nil { + panic(err) + } + r := consul.New(consulCli) + + // new grpc client + conn, err := grpc.DialInsecure( + context.Background(), + grpc.WithEndpoint("discovery:///helloworld"), + grpc.WithDiscovery(r), + // 由于gRPC框架的限制只能使用全局balancer+filter的方式来实现selector + // 这里使用weighted round robin算法的balancer+静态version=1.0.0的Filter + grpc.WithBalancerName(wrr.Name), + grpc.WithSelectFilter(filter.Version("1.0.0")), + ) + if err != nil { + log.Fatal(err) + } + defer conn.Close() + gClient := helloworld.NewGreeterClient(conn) + + // new http client + hConn, err := http.NewClient( + context.Background(), + http.WithMiddleware( + recovery.Recovery(), + ), + http.WithEndpoint("discovery:///helloworld"), + http.WithDiscovery(r), + // 这里使用p2c算法的balancer+静态version=2.0.0的Filter组成一个selector + http.WithSelector( + p2c.New(p2c.WithFilter(filter.Version("2.0.0"))), + ), + ) + if err != nil { + log.Fatal(err) + } + defer hConn.Close() + hClient := helloworld.NewGreeterHTTPClient(hConn) + + for { + time.Sleep(time.Second) + callGRPC(gClient) + callHTTP(hClient) + } +} + +func callGRPC(client helloworld.GreeterClient) { + reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"}) + if err != nil { + log.Fatal(err) + } + log.Printf("[grpc] SayHello %+v\n", reply) +} + +func callHTTP(client helloworld.GreeterHTTPClient) { + reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"}) + if err != nil { + log.Fatal(err) + } + log.Printf("[http] SayHello %s\n", reply.Message) +} diff --git a/examples/selector/server/main.go b/examples/selector/server/main.go new file mode 100644 index 000000000..247bdc4ca --- /dev/null +++ b/examples/selector/server/main.go @@ -0,0 +1,79 @@ +package main + +import ( + "context" + "fmt" + "os" + + "github.com/go-kratos/kratos/contrib/registry/consul/v2" + "github.com/go-kratos/kratos/examples/helloworld/helloworld" + "github.com/go-kratos/kratos/v2" + "github.com/go-kratos/kratos/v2/log" + "github.com/go-kratos/kratos/v2/middleware/logging" + "github.com/go-kratos/kratos/v2/middleware/recovery" + "github.com/go-kratos/kratos/v2/transport/grpc" + "github.com/go-kratos/kratos/v2/transport/http" + "github.com/hashicorp/consul/api" +) + +// server is used to implement helloworld.GreeterServer. +type server struct { + helloworld.UnimplementedGreeterServer +} + +// SayHello implements helloworld.GreeterServer +func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) { + return &helloworld.HelloReply{Message: fmt.Sprintf("Welcome %+v!", in.Name)}, nil +} + +func main() { + logger := log.NewStdLogger(os.Stdout) + + consulClient, err := api.NewClient(api.DefaultConfig()) + if err != nil { + log.NewHelper(logger).Fatal(err) + } + go runServer("1.0.0", logger, consulClient, 8000) + go runServer("1.0.0", logger, consulClient, 8010) + + runServer("2.0.0", logger, consulClient, 8020) +} + +func runServer(version string, logger log.Logger, client *api.Client, port int) { + logger = log.With(logger, "version", version, "port:", port) + log := log.NewHelper(logger) + + httpSrv := http.NewServer( + http.Address(fmt.Sprintf(":%d", port)), + http.Middleware( + recovery.Recovery(), + logging.Server(logger), + ), + ) + grpcSrv := grpc.NewServer( + grpc.Address(fmt.Sprintf(":%d", port+1000)), + grpc.Middleware( + recovery.Recovery(), + logging.Server(logger), + ), + ) + + s := &server{} + helloworld.RegisterGreeterServer(grpcSrv, s) + helloworld.RegisterGreeterHTTPServer(httpSrv, s) + + r := consul.New(client) + app := kratos.New( + kratos.Name("helloworld"), + kratos.Server( + grpcSrv, + httpSrv, + ), + kratos.Version(version), + kratos.Registrar(r), + ) + + if err := app.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/selector/default.go b/selector/default.go index 63a19bb45..25698e718 100644 --- a/selector/default.go +++ b/selector/default.go @@ -9,6 +9,7 @@ import ( type Default struct { NodeBuilder WeightedNodeBuilder Balancer Balancer + Filters []Filter lk sync.RWMutex weightedNodes []Node @@ -19,6 +20,10 @@ func (d *Default) Select(ctx context.Context, opts ...SelectOption) (selected No d.lk.RLock() weightedNodes := d.weightedNodes d.lk.RUnlock() + + for _, f := range d.Filters { + weightedNodes = f(ctx, weightedNodes) + } var options SelectOptions for _, o := range opts { o(&options) diff --git a/selector/p2c/p2c.go b/selector/p2c/p2c.go index 88cd3a7b2..817bccd1e 100644 --- a/selector/p2c/p2c.go +++ b/selector/p2c/p2c.go @@ -18,13 +18,33 @@ const ( var _ selector.Balancer = &Balancer{} +// WithFilter with select filters +func WithFilter(filters ...selector.Filter) Option { + return func(o *options) { + o.filters = filters + } +} + +// Option is random builder option. +type Option func(o *options) + +// options is random builder options +type options struct { + filters []selector.Filter +} + // New creates a p2c selector. -func New() selector.Selector { +func New(opts ...Option) selector.Selector { + var option options + for _, opt := range opts { + opt(&option) + } return &selector.Default{ NodeBuilder: &ewma.Builder{}, Balancer: &Balancer{ r: rand.New(rand.NewSource(time.Now().UnixNano())), }, + Filters: option.filters, } } diff --git a/selector/random/random.go b/selector/random/random.go index 30e33d4da..14a194df7 100644 --- a/selector/random/random.go +++ b/selector/random/random.go @@ -8,21 +8,42 @@ import ( "github.com/go-kratos/kratos/v2/selector/node/direct" ) -var ( - _ selector.Balancer = &Balancer{} - - // Name is balancer name +const ( + // Name is random balancer name Name = "random" ) +var _ selector.Balancer = &Balancer{} // Name is balancer name + +// WithFilter with select filters +func WithFilter(filters ...selector.Filter) Option { + return func(o *options) { + o.filters = filters + } +} + +// Option is random builder option. +type Option func(o *options) + +// options is random builder options +type options struct { + filters []selector.Filter +} + // Balancer is a random balancer. type Balancer struct{} // New random a selector. -func New() selector.Selector { +func New(opts ...Option) selector.Selector { + var option options + for _, opt := range opts { + opt(&option) + } + return &selector.Default{ Balancer: &Balancer{}, NodeBuilder: &direct.Builder{}, + Filters: option.filters, } } diff --git a/selector/wrr/wrr.go b/selector/wrr/wrr.go new file mode 100644 index 000000000..e5898daf4 --- /dev/null +++ b/selector/wrr/wrr.go @@ -0,0 +1,82 @@ +package wrr + +import ( + "context" + "sync" + + "github.com/go-kratos/kratos/v2/selector" + "github.com/go-kratos/kratos/v2/selector/node/direct" +) + +const ( + // Name is wrr balancer name + Name = "wrr" +) + +var _ selector.Balancer = &Balancer{} // Name is balancer name + +// WithFilter with select filters +func WithFilter(filters ...selector.Filter) Option { + return func(o *options) { + o.filters = filters + } +} + +// Option is random builder option. +type Option func(o *options) + +// options is random builder options +type options struct { + filters []selector.Filter +} + +// Balancer is a random balancer. +type Balancer struct { + mu sync.Mutex + currentWeight map[string]float64 +} + +// New random a selector. +func New(opts ...Option) selector.Selector { + var option options + for _, opt := range opts { + opt(&option) + } + + return &selector.Default{ + Balancer: &Balancer{ + currentWeight: make(map[string]float64), + }, + NodeBuilder: &direct.Builder{}, + Filters: option.filters, + } +} + +// Pick pick a weighted node. +func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) { + if len(nodes) == 0 { + return nil, nil, selector.ErrNoAvailable + } + var totalWeight float64 + var selected selector.WeightedNode + var selectWeight float64 + + // nginx wrr load balancing algorithm: http://blog.csdn.net/zhangskd/article/details/50194069 + p.mu.Lock() + for _, node := range nodes { + totalWeight += node.Weight() + cwt := p.currentWeight[node.Address()] + // current += effectiveWeight + cwt += node.Weight() + p.currentWeight[node.Address()] = cwt + if selected == nil || selectWeight < cwt { + selectWeight = cwt + selected = node + } + } + p.currentWeight[selected.Address()] = selectWeight - totalWeight + p.mu.Unlock() + + d := selected.Pick() + return selected, d, nil +} diff --git a/transport/grpc/balancer/balancer.go b/transport/grpc/balancer.go similarity index 85% rename from transport/grpc/balancer/balancer.go rename to transport/grpc/balancer.go index 573f9b7a6..3814742fe 100644 --- a/transport/grpc/balancer/balancer.go +++ b/transport/grpc/balancer.go @@ -1,4 +1,4 @@ -package balancer +package grpc import ( "sync" @@ -8,6 +8,8 @@ import ( "github.com/go-kratos/kratos/v2/selector/node" "github.com/go-kratos/kratos/v2/selector/p2c" "github.com/go-kratos/kratos/v2/selector/random" + "github.com/go-kratos/kratos/v2/selector/wrr" + "github.com/go-kratos/kratos/v2/transport" gBalancer "google.golang.org/grpc/balancer" "google.golang.org/grpc/balancer/base" @@ -24,6 +26,7 @@ var ( func init() { // inject global grpc balancer SetGlobalBalancer(random.Name, random.New()) + SetGlobalBalancer(wrr.Name, wrr.New()) SetGlobalBalancer(p2c.Name, p2c.New()) } @@ -74,7 +77,14 @@ type Picker struct { // Pick pick instances. func (p *Picker) Pick(info gBalancer.PickInfo) (gBalancer.PickResult, error) { - n, done, err := p.selector.Select(info.Ctx) + var filters []selector.Filter + if tr, ok := transport.FromClientContext(info.Ctx); ok { + if gtr, ok := tr.(*Transport); ok { + filters = gtr.Filters() + } + } + + n, done, err := p.selector.Select(info.Ctx, selector.WithFilter(filters...)) if err != nil { return gBalancer.PickResult{}, err } diff --git a/transport/grpc/client.go b/transport/grpc/client.go index fa33c7070..dd1a614b3 100644 --- a/transport/grpc/client.go +++ b/transport/grpc/client.go @@ -8,14 +8,13 @@ import ( "github.com/go-kratos/kratos/v2/middleware" "github.com/go-kratos/kratos/v2/registry" - "github.com/go-kratos/kratos/v2/selector/random" + "github.com/go-kratos/kratos/v2/selector" + "github.com/go-kratos/kratos/v2/selector/wrr" "github.com/go-kratos/kratos/v2/transport" "github.com/go-kratos/kratos/v2/transport/grpc/resolver/discovery" // init resolver _ "github.com/go-kratos/kratos/v2/transport/grpc/resolver/direct" - // init balancer - _ "github.com/go-kratos/kratos/v2/transport/grpc/balancer" "google.golang.org/grpc" "google.golang.org/grpc/credentials" @@ -74,15 +73,31 @@ func WithOptions(opts ...grpc.DialOption) ClientOption { } } +// WithBalancerName with balancer name +func WithBalancerName(name string) ClientOption { + return func(o *clientOptions) { + o.balancerName = name + } +} + +// WithSelectFilter with select filters +func WithSelectFilter(filters ...selector.Filter) ClientOption { + return func(o *clientOptions) { + o.filters = filters + } +} + // clientOptions is gRPC Client type clientOptions struct { - endpoint string - tlsConf *tls.Config - timeout time.Duration - discovery registry.Discovery - middleware []middleware.Middleware - ints []grpc.UnaryClientInterceptor - grpcOpts []grpc.DialOption + endpoint string + tlsConf *tls.Config + timeout time.Duration + discovery registry.Discovery + middleware []middleware.Middleware + ints []grpc.UnaryClientInterceptor + grpcOpts []grpc.DialOption + balancerName string + filters []selector.Filter } // Dial returns a GRPC connection. @@ -97,19 +112,20 @@ func DialInsecure(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn, func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) { options := clientOptions{ - timeout: 2000 * time.Millisecond, + timeout: 2000 * time.Millisecond, + balancerName: wrr.Name, } for _, o := range opts { o(&options) } ints := []grpc.UnaryClientInterceptor{ - unaryClientInterceptor(options.middleware, options.timeout), + unaryClientInterceptor(options.middleware, options.timeout, options.filters), } if len(options.ints) > 0 { ints = append(ints, options.ints...) } grpcOpts := []grpc.DialOption{ - grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, random.Name)), + grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, options.balancerName)), grpc.WithChainUnaryInterceptor(ints...), } if options.discovery != nil { @@ -127,12 +143,13 @@ func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.Clien return grpc.DialContext(ctx, options.endpoint, grpcOpts...) } -func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor { +func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration, filters []selector.Filter) grpc.UnaryClientInterceptor { return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { ctx = transport.NewClientContext(ctx, &Transport{ endpoint: cc.Target(), operation: method, reqHeader: headerCarrier{}, + filters: filters, }) if timeout > 0 { var cancel context.CancelFunc diff --git a/transport/grpc/client_test.go b/transport/grpc/client_test.go index f18b61833..f618d9bf1 100644 --- a/transport/grpc/client_test.go +++ b/transport/grpc/client_test.go @@ -68,7 +68,7 @@ func EmptyMiddleware() middleware.Middleware { } func TestUnaryClientInterceptor(t *testing.T) { - f := unaryClientInterceptor([]middleware.Middleware{EmptyMiddleware()}, time.Duration(100)) + f := unaryClientInterceptor([]middleware.Middleware{EmptyMiddleware()}, time.Duration(100), nil) req := &struct{}{} resp := &struct{}{} diff --git a/transport/grpc/transport.go b/transport/grpc/transport.go index 3f07e7443..eb8c51272 100644 --- a/transport/grpc/transport.go +++ b/transport/grpc/transport.go @@ -1,6 +1,7 @@ package grpc import ( + "github.com/go-kratos/kratos/v2/selector" "github.com/go-kratos/kratos/v2/transport" "google.golang.org/grpc/metadata" ) @@ -13,6 +14,7 @@ type Transport struct { operation string reqHeader headerCarrier replyHeader headerCarrier + filters []selector.Filter } // Kind returns the transport kind. @@ -40,6 +42,11 @@ func (tr *Transport) ReplyHeader() transport.Header { return tr.replyHeader } +// Filters returns the client select filters. +func (tr *Transport) Filters() []selector.Filter { + return tr.filters +} + type headerCarrier metadata.MD // Get returns the value associated with the passed key. diff --git a/transport/http/client.go b/transport/http/client.go index 02542498c..fcd29d2cb 100644 --- a/transport/http/client.go +++ b/transport/http/client.go @@ -17,7 +17,7 @@ import ( "github.com/go-kratos/kratos/v2/middleware" "github.com/go-kratos/kratos/v2/registry" "github.com/go-kratos/kratos/v2/selector" - "github.com/go-kratos/kratos/v2/selector/random" + "github.com/go-kratos/kratos/v2/selector/wrr" "github.com/go-kratos/kratos/v2/transport" ) @@ -152,7 +152,7 @@ func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) { decoder: DefaultResponseDecoder, errorDecoder: DefaultErrorDecoder, transport: http.DefaultTransport, - selector: random.New(), + selector: wrr.New(), } for _, o := range opts { o(&options)