feat: consul registry support get services from multiple datacenters (#2536)

* feat: consul支持多数据中心

* feat: fix comments

* refactor: param name

* fix: comments fix

* refactor: param name

* refactor: function name
pull/2556/head
woniu317 2 years ago committed by GitHub
parent 3393990cd8
commit 2cf82fa4a7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 68
      contrib/registry/consul/client.go
  2. 11
      contrib/registry/consul/registry.go

@ -15,8 +15,16 @@ import (
"github.com/hashicorp/consul/api" "github.com/hashicorp/consul/api"
) )
type Datacenter string
const (
SingleDatacenter Datacenter = "SINGLE"
MultiDatacenter Datacenter = "MULTI"
)
// Client is consul client config // Client is consul client config
type Client struct { type Client struct {
dc Datacenter
cli *api.Client cli *api.Client
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
@ -33,15 +41,21 @@ type Client struct {
serviceChecks api.AgentServiceChecks serviceChecks api.AgentServiceChecks
} }
// NewClient creates consul client // Deprecated use newClient instead.
func NewClient(cli *api.Client) *Client { func NewClient(cli *api.Client) *Client {
return newClient(cli, SingleDatacenter)
}
func newClient(cli *api.Client, dc Datacenter) *Client {
c := &Client{ c := &Client{
dc: dc,
cli: cli, cli: cli,
resolver: defaultResolver, resolver: defaultResolver,
healthcheckInterval: 10, healthcheckInterval: 10,
heartbeat: true, heartbeat: true,
deregisterCriticalServiceAfter: 600, deregisterCriticalServiceAfter: 600,
} }
c.ctx, c.cancel = context.WithCancel(context.Background()) c.ctx, c.cancel = context.WithCancel(context.Background())
return c return c
} }
@ -83,18 +97,68 @@ type ServiceResolver func(ctx context.Context, entries []*api.ServiceEntry) []*r
// Service get services from consul // Service get services from consul
func (c *Client) Service(ctx context.Context, service string, index uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) { func (c *Client) Service(ctx context.Context, service string, index uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) {
if c.dc == MultiDatacenter {
return c.multiDCService(ctx, service, index, passingOnly)
}
opts := &api.QueryOptions{ opts := &api.QueryOptions{
WaitIndex: index, WaitIndex: index,
WaitTime: time.Second * 55, WaitTime: time.Second * 55,
Datacenter: string(c.dc),
} }
opts = opts.WithContext(ctx) opts = opts.WithContext(ctx)
entries, meta, err := c.cli.Health().Service(service, "", passingOnly, opts)
if c.dc == SingleDatacenter {
opts.Datacenter = ""
}
entries, meta, err := c.singleDCEntries(service, "", passingOnly, opts)
if err != nil { if err != nil {
return nil, 0, err return nil, 0, err
} }
return c.resolver(ctx, entries), meta.LastIndex, nil return c.resolver(ctx, entries), meta.LastIndex, nil
} }
func (c *Client) multiDCService(ctx context.Context, service string, index uint64, passingOnly bool) ([]*registry.ServiceInstance, uint64, error) {
opts := &api.QueryOptions{
WaitIndex: index,
WaitTime: time.Second * 55,
}
opts = opts.WithContext(ctx)
var instances []*registry.ServiceInstance
dcs, err := c.cli.Catalog().Datacenters()
if err != nil {
return nil, 0, err
}
for _, dc := range dcs {
opts.Datacenter = dc
e, m, err := c.singleDCEntries(service, "", passingOnly, opts)
if err != nil {
return nil, 0, err
}
ins := c.resolver(ctx, e)
for _, in := range ins {
if in.Metadata == nil {
in.Metadata = make(map[string]string, 1)
}
in.Metadata["dc"] = dc
}
instances = append(instances, ins...)
opts.WaitIndex = m.LastIndex
}
return instances, opts.WaitIndex, nil
}
func (c *Client) singleDCEntries(service, tag string, passingOnly bool, opts *api.QueryOptions) ([]*api.ServiceEntry, *api.QueryMeta, error) {
return c.cli.Health().Service(service, tag, passingOnly, opts)
}
// Register register service instance to consul // Register register service instance to consul
func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enableHealthCheck bool) error { func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enableHealthCheck bool) error {
addresses := make(map[string]api.ServiceAddress, len(svc.Endpoints)) addresses := make(map[string]api.ServiceAddress, len(svc.Endpoints))

@ -27,6 +27,13 @@ func WithHealthCheck(enable bool) Option {
} }
} }
// WithDatacenter with registry datacenter option
func WithDatacenter(dc Datacenter) Option {
return func(o *Registry) {
o.dc = dc
}
}
// WithHeartbeat enable or disable heartbeat // WithHeartbeat enable or disable heartbeat
func WithHeartbeat(enable bool) Option { func WithHeartbeat(enable bool) Option {
return func(o *Registry) { return func(o *Registry) {
@ -83,18 +90,20 @@ type Registry struct {
enableHealthCheck bool enableHealthCheck bool
registry map[string]*serviceSet registry map[string]*serviceSet
lock sync.RWMutex lock sync.RWMutex
dc Datacenter
} }
// New creates consul registry // New creates consul registry
func New(apiClient *api.Client, opts ...Option) *Registry { func New(apiClient *api.Client, opts ...Option) *Registry {
r := &Registry{ r := &Registry{
cli: NewClient(apiClient), dc: SingleDatacenter,
registry: make(map[string]*serviceSet), registry: make(map[string]*serviceSet),
enableHealthCheck: true, enableHealthCheck: true,
} }
for _, o := range opts { for _, o := range opts {
o(r) o(r)
} }
r.cli = newClient(apiClient, r.dc)
return r return r
} }

Loading…
Cancel
Save