consul: support WithServiceResolver option (#1693)

* consul: add endpoints resolver option

* consul: resolver []*api.ServiceEntry -> []*registry.ServiceInstance

* consul: rename Resolver -> ServiceResolver
pull/1696/head
Cluas 3 years ago committed by GitHub
parent 2e045c3e42
commit 03f5ee015c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      contrib/registry/consul/client.go
  2. 9
      contrib/registry/consul/registry.go

@ -18,38 +18,29 @@ type Client struct {
cli *api.Client
ctx context.Context
cancel context.CancelFunc
// resolve service entry endpoints
resolver ServiceResolver
}
// NewClient creates consul client
func NewClient(cli *api.Client) *Client {
c := &Client{cli: cli}
c := &Client{cli: cli, resolver: defaultResolver}
c.ctx, c.cancel = context.WithCancel(context.Background())
return c
}
// Service get services from consul
func (d *Client) Service(ctx context.Context, service string, index uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) {
opts := &api.QueryOptions{
WaitIndex: index,
WaitTime: time.Second * 55,
}
opts = opts.WithContext(ctx)
entries, meta, err := d.cli.Health().Service(service, "", passingOnly, opts)
if err != nil {
return nil, 0, err
}
services := make([]*registry.ServiceInstance, 0)
func defaultResolver(_ context.Context, entries []*api.ServiceEntry) []*registry.ServiceInstance {
services := make([]*registry.ServiceInstance, 0, len(entries))
for _, entry := range entries {
var version string
for _, tag := range entry.Service.Tags {
strs := strings.SplitN(tag, "=", 2)
if len(strs) == 2 && strs[0] == "version" {
version = strs[1]
ss := strings.SplitN(tag, "=", 2)
if len(ss) == 2 && ss[0] == "version" {
version = ss[1]
}
}
var endpoints []string
var endpoints []string //nolint:prealloc
for scheme, addr := range entry.Service.TaggedAddresses {
if scheme == "lan_ipv4" || scheme == "wan_ipv4" || scheme == "lan_ipv6" || scheme == "wan_ipv6" {
continue
@ -64,11 +55,29 @@ func (d *Client) Service(ctx context.Context, service string, index uint64, pass
Endpoints: endpoints,
})
}
return services, meta.LastIndex, nil
return services
}
// ServiceResolver is used to resolve service endpoints
type ServiceResolver func(ctx context.Context, entries []*api.ServiceEntry) []*registry.ServiceInstance
// Service get services from consul
func (c *Client) Service(ctx context.Context, service string, index uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) {
opts := &api.QueryOptions{
WaitIndex: index,
WaitTime: time.Second * 55,
}
opts = opts.WithContext(ctx)
entries, meta, err := c.cli.Health().Service(service, "", passingOnly, opts)
if err != nil {
return nil, 0, err
}
return c.resolver(ctx, entries), meta.LastIndex, nil
}
// Register register service instacen to consul
func (d *Client) Register(ctx context.Context, svc *registry.ServiceInstance, enableHealthCheck bool) error {
// Register register service instance to consul
func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enableHealthCheck bool) error {
addresses := make(map[string]api.ServiceAddress)
checkAddresses := make([]string, 0, len(svc.Endpoints))
for _, endpoint := range svc.Endpoints {
@ -103,7 +112,7 @@ func (d *Client) Register(ctx context.Context, svc *registry.ServiceInstance, en
})
}
}
err := d.cli.Agent().ServiceRegister(asr)
err := c.cli.Agent().ServiceRegister(asr)
if err != nil {
return err
}
@ -113,8 +122,8 @@ func (d *Client) Register(ctx context.Context, svc *registry.ServiceInstance, en
for {
select {
case <-ticker.C:
_ = d.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass")
case <-d.ctx.Done():
_ = c.cli.Agent().UpdateTTL("service:"+svc.ID, "pass", "pass")
case <-c.ctx.Done():
return
}
}
@ -123,7 +132,7 @@ func (d *Client) Register(ctx context.Context, svc *registry.ServiceInstance, en
}
// Deregister deregister service by service ID
func (d *Client) Deregister(ctx context.Context, serviceID string) error {
d.cancel()
return d.cli.Agent().ServiceDeregister(serviceID)
func (c *Client) Deregister(_ context.Context, serviceID string) error {
c.cancel()
return c.cli.Agent().ServiceDeregister(serviceID)
}

@ -26,6 +26,15 @@ func WithHealthCheck(enable bool) Option {
}
}
// WithServiceResolver with endpoint function option.
func WithServiceResolver(fn ServiceResolver) Option {
return func(o *Registry) {
if o.cli != nil {
o.cli.resolver = fn
}
}
}
// Config is consul registry config
type Config struct {
*api.Config

Loading…
Cancel
Save