feat: add selector and filters examples (#1485)

* add selector examples and filters
pull/1506/head
longxboy 3 years ago committed by GitHub
parent 25f448794d
commit feeec630d7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 82
      examples/selector/client/main.go
  2. 79
      examples/selector/server/main.go
  3. 5
      selector/default.go
  4. 22
      selector/p2c/p2c.go
  5. 31
      selector/random/random.go
  6. 82
      selector/wrr/wrr.go
  7. 14
      transport/grpc/balancer.go
  8. 45
      transport/grpc/client.go
  9. 2
      transport/grpc/client_test.go
  10. 7
      transport/grpc/transport.go
  11. 4
      transport/http/client.go

@ -0,0 +1,82 @@
package main
import (
"context"
"log"
"time"
"github.com/go-kratos/kratos/contrib/registry/consul/v2"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/selector/filter"
"github.com/go-kratos/kratos/v2/selector/p2c"
"github.com/go-kratos/kratos/v2/selector/wrr"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
"github.com/hashicorp/consul/api"
)
func main() {
consulCli, err := api.NewClient(api.DefaultConfig())
if err != nil {
panic(err)
}
r := consul.New(consulCli)
// new grpc client
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
// 由于gRPC框架的限制只能使用全局balancer+filter的方式来实现selector
// 这里使用weighted round robin算法的balancer+静态version=1.0.0的Filter
grpc.WithBalancerName(wrr.Name),
grpc.WithSelectFilter(filter.Version("1.0.0")),
)
if err != nil {
log.Fatal(err)
}
defer conn.Close()
gClient := helloworld.NewGreeterClient(conn)
// new http client
hConn, err := http.NewClient(
context.Background(),
http.WithMiddleware(
recovery.Recovery(),
),
http.WithEndpoint("discovery:///helloworld"),
http.WithDiscovery(r),
// 这里使用p2c算法的balancer+静态version=2.0.0的Filter组成一个selector
http.WithSelector(
p2c.New(p2c.WithFilter(filter.Version("2.0.0"))),
),
)
if err != nil {
log.Fatal(err)
}
defer hConn.Close()
hClient := helloworld.NewGreeterHTTPClient(hConn)
for {
time.Sleep(time.Second)
callGRPC(gClient)
callHTTP(hClient)
}
}
func callGRPC(client helloworld.GreeterClient) {
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}
log.Printf("[grpc] SayHello %+v\n", reply)
}
func callHTTP(client helloworld.GreeterHTTPClient) {
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}
log.Printf("[http] SayHello %s\n", reply.Message)
}

@ -0,0 +1,79 @@
package main
import (
"context"
"fmt"
"os"
"github.com/go-kratos/kratos/contrib/registry/consul/v2"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/logging"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
"github.com/hashicorp/consul/api"
)
// server is used to implement helloworld.GreeterServer.
type server struct {
helloworld.UnimplementedGreeterServer
}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
return &helloworld.HelloReply{Message: fmt.Sprintf("Welcome %+v!", in.Name)}, nil
}
func main() {
logger := log.NewStdLogger(os.Stdout)
consulClient, err := api.NewClient(api.DefaultConfig())
if err != nil {
log.NewHelper(logger).Fatal(err)
}
go runServer("1.0.0", logger, consulClient, 8000)
go runServer("1.0.0", logger, consulClient, 8010)
runServer("2.0.0", logger, consulClient, 8020)
}
func runServer(version string, logger log.Logger, client *api.Client, port int) {
logger = log.With(logger, "version", version, "port:", port)
log := log.NewHelper(logger)
httpSrv := http.NewServer(
http.Address(fmt.Sprintf(":%d", port)),
http.Middleware(
recovery.Recovery(),
logging.Server(logger),
),
)
grpcSrv := grpc.NewServer(
grpc.Address(fmt.Sprintf(":%d", port+1000)),
grpc.Middleware(
recovery.Recovery(),
logging.Server(logger),
),
)
s := &server{}
helloworld.RegisterGreeterServer(grpcSrv, s)
helloworld.RegisterGreeterHTTPServer(httpSrv, s)
r := consul.New(client)
app := kratos.New(
kratos.Name("helloworld"),
kratos.Server(
grpcSrv,
httpSrv,
),
kratos.Version(version),
kratos.Registrar(r),
)
if err := app.Run(); err != nil {
log.Fatal(err)
}
}

@ -9,6 +9,7 @@ import (
type Default struct {
NodeBuilder WeightedNodeBuilder
Balancer Balancer
Filters []Filter
lk sync.RWMutex
weightedNodes []Node
@ -19,6 +20,10 @@ func (d *Default) Select(ctx context.Context, opts ...SelectOption) (selected No
d.lk.RLock()
weightedNodes := d.weightedNodes
d.lk.RUnlock()
for _, f := range d.Filters {
weightedNodes = f(ctx, weightedNodes)
}
var options SelectOptions
for _, o := range opts {
o(&options)

@ -18,13 +18,33 @@ const (
var _ selector.Balancer = &Balancer{}
// WithFilter with select filters
func WithFilter(filters ...selector.Filter) Option {
return func(o *options) {
o.filters = filters
}
}
// Option is random builder option.
type Option func(o *options)
// options is random builder options
type options struct {
filters []selector.Filter
}
// New creates a p2c selector.
func New() selector.Selector {
func New(opts ...Option) selector.Selector {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.Default{
NodeBuilder: &ewma.Builder{},
Balancer: &Balancer{
r: rand.New(rand.NewSource(time.Now().UnixNano())),
},
Filters: option.filters,
}
}

@ -8,21 +8,42 @@ import (
"github.com/go-kratos/kratos/v2/selector/node/direct"
)
var (
_ selector.Balancer = &Balancer{}
// Name is balancer name
const (
// Name is random balancer name
Name = "random"
)
var _ selector.Balancer = &Balancer{} // Name is balancer name
// WithFilter with select filters
func WithFilter(filters ...selector.Filter) Option {
return func(o *options) {
o.filters = filters
}
}
// Option is random builder option.
type Option func(o *options)
// options is random builder options
type options struct {
filters []selector.Filter
}
// Balancer is a random balancer.
type Balancer struct{}
// New random a selector.
func New() selector.Selector {
func New(opts ...Option) selector.Selector {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.Default{
Balancer: &Balancer{},
NodeBuilder: &direct.Builder{},
Filters: option.filters,
}
}

@ -0,0 +1,82 @@
package wrr
import (
"context"
"sync"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/node/direct"
)
const (
// Name is wrr balancer name
Name = "wrr"
)
var _ selector.Balancer = &Balancer{} // Name is balancer name
// WithFilter with select filters
func WithFilter(filters ...selector.Filter) Option {
return func(o *options) {
o.filters = filters
}
}
// Option is random builder option.
type Option func(o *options)
// options is random builder options
type options struct {
filters []selector.Filter
}
// Balancer is a random balancer.
type Balancer struct {
mu sync.Mutex
currentWeight map[string]float64
}
// New random a selector.
func New(opts ...Option) selector.Selector {
var option options
for _, opt := range opts {
opt(&option)
}
return &selector.Default{
Balancer: &Balancer{
currentWeight: make(map[string]float64),
},
NodeBuilder: &direct.Builder{},
Filters: option.filters,
}
}
// Pick pick a weighted node.
func (p *Balancer) Pick(_ context.Context, nodes []selector.WeightedNode) (selector.WeightedNode, selector.DoneFunc, error) {
if len(nodes) == 0 {
return nil, nil, selector.ErrNoAvailable
}
var totalWeight float64
var selected selector.WeightedNode
var selectWeight float64
// nginx wrr load balancing algorithm: http://blog.csdn.net/zhangskd/article/details/50194069
p.mu.Lock()
for _, node := range nodes {
totalWeight += node.Weight()
cwt := p.currentWeight[node.Address()]
// current += effectiveWeight
cwt += node.Weight()
p.currentWeight[node.Address()] = cwt
if selected == nil || selectWeight < cwt {
selectWeight = cwt
selected = node
}
}
p.currentWeight[selected.Address()] = selectWeight - totalWeight
p.mu.Unlock()
d := selected.Pick()
return selected, d, nil
}

@ -1,4 +1,4 @@
package balancer
package grpc
import (
"sync"
@ -8,6 +8,8 @@ import (
"github.com/go-kratos/kratos/v2/selector/node"
"github.com/go-kratos/kratos/v2/selector/p2c"
"github.com/go-kratos/kratos/v2/selector/random"
"github.com/go-kratos/kratos/v2/selector/wrr"
"github.com/go-kratos/kratos/v2/transport"
gBalancer "google.golang.org/grpc/balancer"
"google.golang.org/grpc/balancer/base"
@ -24,6 +26,7 @@ var (
func init() {
// inject global grpc balancer
SetGlobalBalancer(random.Name, random.New())
SetGlobalBalancer(wrr.Name, wrr.New())
SetGlobalBalancer(p2c.Name, p2c.New())
}
@ -74,7 +77,14 @@ type Picker struct {
// Pick pick instances.
func (p *Picker) Pick(info gBalancer.PickInfo) (gBalancer.PickResult, error) {
n, done, err := p.selector.Select(info.Ctx)
var filters []selector.Filter
if tr, ok := transport.FromClientContext(info.Ctx); ok {
if gtr, ok := tr.(*Transport); ok {
filters = gtr.Filters()
}
}
n, done, err := p.selector.Select(info.Ctx, selector.WithFilter(filters...))
if err != nil {
return gBalancer.PickResult{}, err
}

@ -8,14 +8,13 @@ import (
"github.com/go-kratos/kratos/v2/middleware"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector/random"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/wrr"
"github.com/go-kratos/kratos/v2/transport"
"github.com/go-kratos/kratos/v2/transport/grpc/resolver/discovery"
// init resolver
_ "github.com/go-kratos/kratos/v2/transport/grpc/resolver/direct"
// init balancer
_ "github.com/go-kratos/kratos/v2/transport/grpc/balancer"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
@ -74,15 +73,31 @@ func WithOptions(opts ...grpc.DialOption) ClientOption {
}
}
// WithBalancerName with balancer name
func WithBalancerName(name string) ClientOption {
return func(o *clientOptions) {
o.balancerName = name
}
}
// WithSelectFilter with select filters
func WithSelectFilter(filters ...selector.Filter) ClientOption {
return func(o *clientOptions) {
o.filters = filters
}
}
// clientOptions is gRPC Client
type clientOptions struct {
endpoint string
tlsConf *tls.Config
timeout time.Duration
discovery registry.Discovery
middleware []middleware.Middleware
ints []grpc.UnaryClientInterceptor
grpcOpts []grpc.DialOption
endpoint string
tlsConf *tls.Config
timeout time.Duration
discovery registry.Discovery
middleware []middleware.Middleware
ints []grpc.UnaryClientInterceptor
grpcOpts []grpc.DialOption
balancerName string
filters []selector.Filter
}
// Dial returns a GRPC connection.
@ -97,19 +112,20 @@ func DialInsecure(ctx context.Context, opts ...ClientOption) (*grpc.ClientConn,
func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.ClientConn, error) {
options := clientOptions{
timeout: 2000 * time.Millisecond,
timeout: 2000 * time.Millisecond,
balancerName: wrr.Name,
}
for _, o := range opts {
o(&options)
}
ints := []grpc.UnaryClientInterceptor{
unaryClientInterceptor(options.middleware, options.timeout),
unaryClientInterceptor(options.middleware, options.timeout, options.filters),
}
if len(options.ints) > 0 {
ints = append(ints, options.ints...)
}
grpcOpts := []grpc.DialOption{
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, random.Name)),
grpc.WithDefaultServiceConfig(fmt.Sprintf(`{"LoadBalancingPolicy": "%s"}`, options.balancerName)),
grpc.WithChainUnaryInterceptor(ints...),
}
if options.discovery != nil {
@ -127,12 +143,13 @@ func dial(ctx context.Context, insecure bool, opts ...ClientOption) (*grpc.Clien
return grpc.DialContext(ctx, options.endpoint, grpcOpts...)
}
func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration) grpc.UnaryClientInterceptor {
func unaryClientInterceptor(ms []middleware.Middleware, timeout time.Duration, filters []selector.Filter) grpc.UnaryClientInterceptor {
return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error {
ctx = transport.NewClientContext(ctx, &Transport{
endpoint: cc.Target(),
operation: method,
reqHeader: headerCarrier{},
filters: filters,
})
if timeout > 0 {
var cancel context.CancelFunc

@ -68,7 +68,7 @@ func EmptyMiddleware() middleware.Middleware {
}
func TestUnaryClientInterceptor(t *testing.T) {
f := unaryClientInterceptor([]middleware.Middleware{EmptyMiddleware()}, time.Duration(100))
f := unaryClientInterceptor([]middleware.Middleware{EmptyMiddleware()}, time.Duration(100), nil)
req := &struct{}{}
resp := &struct{}{}

@ -1,6 +1,7 @@
package grpc
import (
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/transport"
"google.golang.org/grpc/metadata"
)
@ -13,6 +14,7 @@ type Transport struct {
operation string
reqHeader headerCarrier
replyHeader headerCarrier
filters []selector.Filter
}
// Kind returns the transport kind.
@ -40,6 +42,11 @@ func (tr *Transport) ReplyHeader() transport.Header {
return tr.replyHeader
}
// Filters returns the client select filters.
func (tr *Transport) Filters() []selector.Filter {
return tr.filters
}
type headerCarrier metadata.MD
// Get returns the value associated with the passed key.

@ -17,7 +17,7 @@ import (
"github.com/go-kratos/kratos/v2/middleware"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/selector"
"github.com/go-kratos/kratos/v2/selector/random"
"github.com/go-kratos/kratos/v2/selector/wrr"
"github.com/go-kratos/kratos/v2/transport"
)
@ -152,7 +152,7 @@ func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
decoder: DefaultResponseDecoder,
errorDecoder: DefaultErrorDecoder,
transport: http.DefaultTransport,
selector: random.New(),
selector: wrr.New(),
}
for _, o := range opts {
o(&options)

Loading…
Cancel
Save