diff --git a/contrib/registry/consul/client.go b/contrib/registry/consul/client.go index 80ea383a6..a3f84eb50 100644 --- a/contrib/registry/consul/client.go +++ b/contrib/registry/consul/client.go @@ -29,9 +29,26 @@ type Client struct { heartbeat bool // deregisterCriticalServiceAfter time interval in seconds deregisterCriticalServiceAfter int + // registryPortByScheme specifies the schema port registered to the service port + registryPortByScheme string +} + +// newClient creates consul client +func newClient(cli *api.Client, scheme string) *Client { + c := &Client{ + cli: cli, + resolver: defaultResolver, + healthcheckInterval: 10, + heartbeat: true, + deregisterCriticalServiceAfter: 600, + registryPortByScheme: scheme, + } + c.ctx, c.cancel = context.WithCancel(context.Background()) + return c } // NewClient creates consul client +// Deprecated func NewClient(cli *api.Client) *Client { c := &Client{ cli: cli, @@ -115,12 +132,26 @@ func (c *Client) Register(_ context.Context, svc *registry.ServiceInstance, enab Tags: []string{fmt.Sprintf("version=%s", svc.Version)}, TaggedAddresses: addresses, } + if len(checkAddresses) > 0 { host, portRaw, _ := net.SplitHostPort(checkAddresses[0]) port, _ := strconv.ParseInt(portRaw, 10, 32) asr.Address = host asr.Port = int(port) } + + if c.registryPortByScheme != "" { + for _, address := range checkAddresses { + if !strings.HasPrefix(address, c.registryPortByScheme) { + continue + } + host, portRaw, _ := net.SplitHostPort(checkAddresses[0]) + port, _ := strconv.ParseInt(portRaw, 10, 32) + asr.Address = host + asr.Port = int(port) + } + } + if enableHealthCheck { for _, address := range checkAddresses { asr.Checks = append(asr.Checks, &api.AgentServiceCheck{ diff --git a/contrib/registry/consul/registry.go b/contrib/registry/consul/registry.go index 9da95d77b..5b8cb5fcc 100644 --- a/contrib/registry/consul/registry.go +++ b/contrib/registry/consul/registry.go @@ -63,6 +63,14 @@ func WithDeregisterCriticalServiceAfter(interval int) Option { } } +func WithRegistryServicePortByScheme(scheme string) Option { + return func(o *Registry) { + if o.cli != nil { + o.registryPortByScheme = scheme + } + } +} + // Config is consul registry config type Config struct { *api.Config @@ -70,22 +78,23 @@ type Config struct { // Registry is consul registry type Registry struct { - cli *Client - enableHealthCheck bool - registry map[string]*serviceSet - lock sync.RWMutex + cli *Client + enableHealthCheck bool + registry map[string]*serviceSet + registryPortByScheme string + lock sync.RWMutex } // New creates consul registry func New(apiClient *api.Client, opts ...Option) *Registry { r := &Registry{ - cli: NewClient(apiClient), registry: make(map[string]*serviceSet), enableHealthCheck: true, } for _, o := range opts { o(r) } + r.cli = newClient(apiClient, r.registryPortByScheme) return r } diff --git a/contrib/registry/consul/registry_test.go b/contrib/registry/consul/registry_test.go index 483faf1ad..9e2d85aae 100644 --- a/contrib/registry/consul/registry_test.go +++ b/contrib/registry/consul/registry_test.go @@ -26,6 +26,7 @@ func tcpServer(t *testing.T, lis net.Listener) { func TestRegistry_Register(t *testing.T) { opts := []Option{ WithHealthCheck(false), + WithRegistryServicePortByScheme("http"), } type args struct { @@ -157,6 +158,7 @@ func TestRegistry_GetService(t *testing.T) { WithHeartbeat(true), WithHealthCheck(true), WithHealthCheckInterval(5), + WithRegistryServicePortByScheme("http"), } r := New(cli, opts...)