feat(registry): consul support specifying the schema port registered to the service port

feat/consul-port-scheme
baozhecheng 2 years ago
parent 0ce9e8d069
commit c7e08b4b78
  1. 31
      contrib/registry/consul/client.go
  2. 19
      contrib/registry/consul/registry.go
  3. 2
      contrib/registry/consul/registry_test.go

@ -29,9 +29,26 @@ type Client struct {
heartbeat bool heartbeat bool
// deregisterCriticalServiceAfter time interval in seconds // deregisterCriticalServiceAfter time interval in seconds
deregisterCriticalServiceAfter int deregisterCriticalServiceAfter int
// registryPortByScheme specifies the schema port registered to the service port
registryPortByScheme string
}
// newClient creates consul client
func newClient(cli *api.Client, scheme string) *Client {
c := &Client{
cli: cli,
resolver: defaultResolver,
healthcheckInterval: 10,
heartbeat: true,
deregisterCriticalServiceAfter: 600,
registryPortByScheme: scheme,
}
c.ctx, c.cancel = context.WithCancel(context.Background())
return c
} }
// NewClient creates consul client // NewClient creates consul client
// Deprecated
func NewClient(cli *api.Client) *Client { func NewClient(cli *api.Client) *Client {
c := &Client{ c := &Client{
cli: cli, cli: cli,
@ -115,12 +132,26 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab
Tags: []string{fmt.Sprintf("version=%s", svc.Version)}, Tags: []string{fmt.Sprintf("version=%s", svc.Version)},
TaggedAddresses: addresses, TaggedAddresses: addresses,
} }
if len(checkAddresses) > 0 { if len(checkAddresses) > 0 {
host, portRaw, _ := net.SplitHostPort(checkAddresses[0]) host, portRaw, _ := net.SplitHostPort(checkAddresses[0])
port, _ := strconv.ParseInt(portRaw, 10, 32) port, _ := strconv.ParseInt(portRaw, 10, 32)
asr.Address = host asr.Address = host
asr.Port = int(port) asr.Port = int(port)
} }
if c.registryPortByScheme != "" {
for _, address := range checkAddresses {
if !strings.HasPrefix(address, c.registryPortByScheme) {
continue
}
host, portRaw, _ := net.SplitHostPort(checkAddresses[0])
port, _ := strconv.ParseInt(portRaw, 10, 32)
asr.Address = host
asr.Port = int(port)
}
}
if enableHealthCheck { if enableHealthCheck {
for _, address := range checkAddresses { for _, address := range checkAddresses {
asr.Checks = append(asr.Checks, &api.AgentServiceCheck{ asr.Checks = append(asr.Checks, &api.AgentServiceCheck{

@ -63,6 +63,14 @@ func WithDeregisterCriticalServiceAfter(interval int) Option {
} }
} }
func WithRegistryServicePortByScheme(scheme string) Option {
return func(o *Registry) {
if o.cli != nil {
o.registryPortByScheme = scheme
}
}
}
// Config is consul registry config // Config is consul registry config
type Config struct { type Config struct {
*api.Config *api.Config
@ -70,22 +78,23 @@ type Config struct {
// Registry is consul registry // Registry is consul registry
type Registry struct { type Registry struct {
cli *Client cli *Client
enableHealthCheck bool enableHealthCheck bool
registry map[string]*serviceSet registry map[string]*serviceSet
lock sync.RWMutex registryPortByScheme string
lock sync.RWMutex
} }
// New creates consul registry // New creates consul registry
func New(apiClient *api.Client, opts ...Option) *Registry { func New(apiClient *api.Client, opts ...Option) *Registry {
r := &Registry{ r := &Registry{
cli: NewClient(apiClient),
registry: make(map[string]*serviceSet), registry: make(map[string]*serviceSet),
enableHealthCheck: true, enableHealthCheck: true,
} }
for _, o := range opts { for _, o := range opts {
o(r) o(r)
} }
r.cli = newClient(apiClient, r.registryPortByScheme)
return r return r
} }

@ -26,6 +26,7 @@ func tcpServer(t *testing.T, lis net.Listener) {
func TestRegistry_Register(t *testing.T) { func TestRegistry_Register(t *testing.T) {
opts := []Option{ opts := []Option{
WithHealthCheck(false), WithHealthCheck(false),
WithRegistryServicePortByScheme("http"),
} }
type args struct { type args struct {
@ -157,6 +158,7 @@ func TestRegistry_GetService(t *testing.T) {
WithHeartbeat(true), WithHeartbeat(true),
WithHealthCheck(true), WithHealthCheck(true),
WithHealthCheckInterval(5), WithHealthCheckInterval(5),
WithRegistryServicePortByScheme("http"),
} }
r := New(cli, opts...) r := New(cli, opts...)

Loading…
Cancel
Save