* - feat(contrib): add eureka registry (#1792) Co-authored-by: fangyong <fangyong@haodf.com>pull/1902/head
parent
7a5c2207a1
commit
69df1ab9a6
@ -0,0 +1,347 @@ |
||||
package eureka |
||||
|
||||
import ( |
||||
"bytes" |
||||
"context" |
||||
"encoding/json" |
||||
"fmt" |
||||
"io" |
||||
"io/ioutil" |
||||
"math/rand" |
||||
"net/http" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
) |
||||
|
||||
const ( |
||||
statusUp = "UP" |
||||
statusDown = "DOWN" |
||||
statusOutOfServeice = "OUT_OF_SERVICE" |
||||
heartbeatRetry = 3 |
||||
maxIdleConns = 100 |
||||
heartbeatTime = 10 |
||||
httpTimeout = 3 |
||||
refreshTime = 30 |
||||
) |
||||
|
||||
type Endpoint struct { |
||||
InstanceID string |
||||
IP string |
||||
AppID string |
||||
Port int |
||||
SecurePort int |
||||
HomePageURL string |
||||
StatusPageURL string |
||||
HealthCheckURL string |
||||
MetaData map[string]string |
||||
} |
||||
|
||||
// Response for /eureka/apps
|
||||
type ApplicationsRootResponse struct { |
||||
ApplicationsResponse `json:"applications"` |
||||
} |
||||
|
||||
type ApplicationsResponse struct { |
||||
Version string `json:"versions__delta"` |
||||
AppsHashcode string `json:"apps__hashcode"` |
||||
Applications []Application `json:"application"` |
||||
} |
||||
|
||||
type Application struct { |
||||
Name string `json:"name"` |
||||
Instance []Instance `json:"instance"` |
||||
} |
||||
|
||||
type RequestInstance struct { |
||||
Instance Instance `json:"instance"` |
||||
} |
||||
|
||||
type Instance struct { |
||||
InstanceID string `json:"instanceId"` |
||||
HostName string `json:"hostName"` |
||||
Port Port `json:"port"` |
||||
App string `json:"app"` |
||||
IPAddr string `json:"ipAddr"` |
||||
VipAddress string `json:"vipAddress"` |
||||
Status string `json:"status"` |
||||
SecurePort Port `json:"securePort"` |
||||
HomePageURL string `json:"homePageUrl"` |
||||
StatusPageURL string `json:"statusPageUrl"` |
||||
HealthCheckURL string `json:"healthCheckUrl"` |
||||
DataCenterInfo DataCenterInfo `json:"dataCenterInfo"` |
||||
Metadata map[string]string `json:"metadata"` |
||||
} |
||||
|
||||
type Port struct { |
||||
Port int `json:"$"` |
||||
Enabled string `json:"@enabled"` |
||||
} |
||||
|
||||
type DataCenterInfo struct { |
||||
Name string `json:"name"` |
||||
Class string `json:"@class"` |
||||
} |
||||
|
||||
var _ APIInterface = new(Client) |
||||
|
||||
type APIInterface interface { |
||||
Register(ctx context.Context, ep Endpoint) error |
||||
Deregister(ctx context.Context, appID, instanceID string) error |
||||
Heartbeat(ep Endpoint) |
||||
FetchApps(ctx context.Context) []Application |
||||
FetchAllUpInstances(ctx context.Context) []Instance |
||||
FetchAppInstances(ctx context.Context, appID string) (m Application, err error) |
||||
FetchAppUpInstances(ctx context.Context, appID string) []Instance |
||||
FetchAppInstance(ctx context.Context, appID string, instanceID string) (m Instance, err error) |
||||
FetchInstance(ctx context.Context, instanceID string) (m Instance, err error) |
||||
Out(ctx context.Context, appID, instanceID string) error |
||||
Down(ctx context.Context, appID, instanceID string) error |
||||
} |
||||
|
||||
type ClientOption func(e *Client) |
||||
|
||||
func WithMaxRetry(maxRetry int) ClientOption { |
||||
return func(e *Client) { e.maxRetry = maxRetry } |
||||
} |
||||
|
||||
func WithHeartbeatInterval(interval time.Duration) ClientOption { |
||||
return func(e *Client) { |
||||
e.heartbeatInterval = interval |
||||
} |
||||
} |
||||
|
||||
func WithClientContext(ctx context.Context) ClientOption { |
||||
return func(e *Client) { e.ctx = ctx } |
||||
} |
||||
|
||||
func WithNamespace(path string) ClientOption { |
||||
return func(e *Client) { e.eurekaPath = path } |
||||
} |
||||
|
||||
type Client struct { |
||||
ctx context.Context |
||||
urls []string |
||||
eurekaPath string |
||||
maxRetry int |
||||
heartbeatInterval time.Duration |
||||
client *http.Client |
||||
keepalive map[string]chan struct{} |
||||
lock sync.Mutex |
||||
} |
||||
|
||||
func NewClient(urls []string, opts ...ClientOption) *Client { |
||||
tr := &http.Transport{ |
||||
MaxIdleConns: maxIdleConns, |
||||
} |
||||
|
||||
e := &Client{ |
||||
ctx: context.Background(), |
||||
urls: urls, |
||||
eurekaPath: "eureka/v2", |
||||
maxRetry: len(urls), |
||||
heartbeatInterval: time.Second * heartbeatTime, |
||||
client: &http.Client{Transport: tr, Timeout: time.Second * httpTimeout}, |
||||
keepalive: make(map[string]chan struct{}), |
||||
} |
||||
|
||||
for _, o := range opts { |
||||
o(e) |
||||
} |
||||
|
||||
return e |
||||
} |
||||
|
||||
func (e *Client) FetchApps(ctx context.Context) []Application { |
||||
var m ApplicationsRootResponse |
||||
if err := e.do(ctx, "GET", []string{"apps"}, nil, &m); err != nil { |
||||
return nil |
||||
} |
||||
|
||||
return m.Applications |
||||
} |
||||
|
||||
func (e *Client) FetchAppInstances(ctx context.Context, appID string) (m Application, err error) { |
||||
err = e.do(ctx, "GET", []string{"apps", appID}, nil, &m) |
||||
return |
||||
} |
||||
|
||||
func (e *Client) FetchAppUpInstances(ctx context.Context, appID string) []Instance { |
||||
app, err := e.FetchAppInstances(ctx, appID) |
||||
if err != nil { |
||||
return nil |
||||
} |
||||
return e.filterUp(app) |
||||
} |
||||
|
||||
func (e *Client) FetchAppInstance(ctx context.Context, appID string, instanceID string) (m Instance, err error) { |
||||
err = e.do(ctx, "GET", []string{"apps", appID, instanceID}, nil, &m) |
||||
return |
||||
} |
||||
|
||||
func (e *Client) FetchInstance(ctx context.Context, instanceID string) (m Instance, err error) { |
||||
err = e.do(ctx, "GET", []string{"instances", instanceID}, nil, &m) |
||||
return |
||||
} |
||||
|
||||
func (e *Client) Out(ctx context.Context, appID, instanceID string) error { |
||||
return e.do(ctx, "PUT", []string{"apps", appID, instanceID, fmt.Sprintf("status?value=%s", statusOutOfServeice)}, nil, nil) |
||||
} |
||||
|
||||
func (e *Client) Down(ctx context.Context, appID, instanceID string) error { |
||||
return e.do(ctx, "PUT", []string{"apps", appID, instanceID, fmt.Sprintf("status?value=%s", statusDown)}, nil, nil) |
||||
} |
||||
|
||||
func (e *Client) FetchAllUpInstances(ctx context.Context) []Instance { |
||||
return e.filterUp(e.FetchApps(ctx)...) |
||||
} |
||||
|
||||
func (e *Client) Register(ctx context.Context, ep Endpoint) error { |
||||
return e.registerEndpoint(ctx, ep) |
||||
} |
||||
|
||||
func (e *Client) Deregister(ctx context.Context, appID, instanceID string) error { |
||||
if err := e.do(ctx, "DELETE", []string{"apps", appID, instanceID}, nil, nil); err != nil { |
||||
return err |
||||
} |
||||
go e.cancelHeartbeat(appID) |
||||
return nil |
||||
} |
||||
|
||||
func (e *Client) registerEndpoint(ctx context.Context, ep Endpoint) error { |
||||
instance := RequestInstance{ |
||||
Instance: Instance{ |
||||
InstanceID: ep.InstanceID, |
||||
HostName: ep.AppID, |
||||
Port: Port{ |
||||
Port: ep.Port, |
||||
Enabled: "true", |
||||
}, |
||||
App: ep.AppID, |
||||
IPAddr: ep.IP, |
||||
VipAddress: ep.AppID, |
||||
Status: statusUp, |
||||
SecurePort: Port{ |
||||
Port: ep.SecurePort, |
||||
Enabled: "false", |
||||
}, |
||||
HomePageURL: ep.HomePageURL, |
||||
StatusPageURL: ep.StatusPageURL, |
||||
HealthCheckURL: ep.HealthCheckURL, |
||||
DataCenterInfo: DataCenterInfo{ |
||||
Name: "MyOwn", |
||||
Class: "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo", |
||||
}, |
||||
Metadata: ep.MetaData, |
||||
}, |
||||
} |
||||
|
||||
body, err := json.Marshal(instance) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return e.do(ctx, "POST", []string{"apps", ep.AppID}, bytes.NewReader(body), nil) |
||||
} |
||||
|
||||
func (e *Client) Heartbeat(ep Endpoint) { |
||||
e.lock.Lock() |
||||
e.keepalive[ep.AppID] = make(chan struct{}) |
||||
e.lock.Unlock() |
||||
|
||||
ticker := time.NewTicker(e.heartbeatInterval) |
||||
defer ticker.Stop() |
||||
retryCount := 0 |
||||
for { |
||||
select { |
||||
case <-e.ctx.Done(): |
||||
return |
||||
case <-e.keepalive[ep.AppID]: |
||||
return |
||||
case <-ticker.C: |
||||
if err := e.do(e.ctx, "PUT", []string{"apps", ep.AppID, ep.InstanceID}, nil, nil); err != nil { |
||||
if retryCount++; retryCount > heartbeatRetry { |
||||
_ = e.registerEndpoint(e.ctx, ep) |
||||
retryCount = 0 |
||||
} |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (e *Client) cancelHeartbeat(appID string) { |
||||
defer e.lock.Unlock() |
||||
e.lock.Lock() |
||||
if ch, ok := e.keepalive[appID]; ok { |
||||
ch <- struct{}{} |
||||
} |
||||
} |
||||
|
||||
func (e *Client) filterUp(apps ...Application) (res []Instance) { |
||||
for _, app := range apps { |
||||
for _, ins := range app.Instance { |
||||
if ins.Status == statusUp { |
||||
res = append(res, ins) |
||||
} |
||||
} |
||||
} |
||||
|
||||
return |
||||
} |
||||
|
||||
func (e *Client) pickServer(currentTimes int) string { |
||||
return e.urls[currentTimes%e.maxRetry] |
||||
} |
||||
|
||||
func (e *Client) shuffle() { |
||||
rand.Seed(time.Now().UnixNano()) |
||||
rand.Shuffle(len(e.urls), func(i, j int) { |
||||
e.urls[i], e.urls[j] = e.urls[j], e.urls[i] |
||||
}) |
||||
} |
||||
|
||||
func (e *Client) buildAPI(currentTimes int, params ...string) string { |
||||
if currentTimes == 0 { |
||||
e.shuffle() |
||||
} |
||||
server := e.pickServer(currentTimes) |
||||
params = append([]string{server, e.eurekaPath}, params...) |
||||
return strings.Join(params, "/") |
||||
} |
||||
|
||||
func (e *Client) do(ctx context.Context, method string, params []string, input io.Reader, output interface{}) error { |
||||
for i := 0; i < e.maxRetry; i++ { |
||||
request, err := http.NewRequest(method, e.buildAPI(i, params...), input) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
request = request.WithContext(ctx) |
||||
request.Header.Add("User-Agent", "go-eureka-client") |
||||
request.Header.Add("Accept", "application/json;charset=UTF-8") |
||||
request.Header.Add("Content-Type", "application/json;charset=UTF-8") |
||||
resp, err := e.client.Do(request) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
defer func() { |
||||
_, _ = io.Copy(ioutil.Discard, resp.Body) |
||||
resp.Body.Close() |
||||
}() |
||||
|
||||
if output != nil && resp.StatusCode/100 == 2 { |
||||
data, err := ioutil.ReadAll(resp.Body) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if err = json.Unmarshal(data, output); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
if resp.StatusCode >= http.StatusBadRequest { |
||||
return fmt.Errorf("response Error %d", resp.StatusCode) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
return fmt.Errorf("retry after %d times", e.maxRetry) |
||||
} |
@ -0,0 +1,136 @@ |
||||
package eureka |
||||
|
||||
import ( |
||||
"context" |
||||
"strings" |
||||
"sync" |
||||
"time" |
||||
) |
||||
|
||||
type subscriber struct { |
||||
appID string |
||||
callBack func() |
||||
} |
||||
|
||||
type API struct { |
||||
cli *Client |
||||
allInstances map[string][]Instance |
||||
subscribers map[string]*subscriber |
||||
refreshInterval time.Duration |
||||
lock sync.Mutex |
||||
} |
||||
|
||||
func NewAPI(ctx context.Context, client *Client, refreshInterval time.Duration) *API { |
||||
e := &API{ |
||||
cli: client, |
||||
allInstances: make(map[string][]Instance), |
||||
subscribers: make(map[string]*subscriber), |
||||
refreshInterval: refreshInterval, |
||||
} |
||||
|
||||
// 首次广播一次
|
||||
e.broadcast() |
||||
|
||||
go e.refresh(ctx) |
||||
|
||||
return e |
||||
} |
||||
|
||||
func (e *API) refresh(ctx context.Context) { |
||||
ticker := time.NewTicker(e.refreshInterval) |
||||
defer ticker.Stop() |
||||
for { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return |
||||
case <-ticker.C: |
||||
e.broadcast() |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (e *API) broadcast() { |
||||
instances := e.cacheAllInstances() |
||||
if instances == nil { |
||||
return |
||||
} |
||||
|
||||
for _, subscriber := range e.subscribers { |
||||
go subscriber.callBack() |
||||
} |
||||
defer e.lock.Unlock() |
||||
e.lock.Lock() |
||||
e.allInstances = instances |
||||
} |
||||
|
||||
func (e *API) cacheAllInstances() map[string][]Instance { |
||||
items := make(map[string][]Instance) |
||||
instances := e.cli.FetchAllUpInstances(context.Background()) |
||||
for _, instance := range instances { |
||||
items[e.ToAppID(instance.App)] = append(items[instance.App], instance) |
||||
} |
||||
|
||||
return items |
||||
} |
||||
|
||||
func (e *API) Register(ctx context.Context, serviceName string, endpoints ...Endpoint) error { |
||||
appID := e.ToAppID(serviceName) |
||||
upInstances := make(map[string]struct{}) |
||||
|
||||
for _, ins := range e.GetService(ctx, appID) { |
||||
upInstances[ins.InstanceID] = struct{}{} |
||||
} |
||||
|
||||
for _, ep := range endpoints { |
||||
if _, ok := upInstances[ep.InstanceID]; !ok { |
||||
if err := e.cli.Register(ctx, ep); err != nil { |
||||
return err |
||||
} |
||||
go e.cli.Heartbeat(ep) |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Deregister 中的ctx 和 register ctx 是同一个
|
||||
func (e *API) Deregister(ctx context.Context, endpoints []Endpoint) error { |
||||
for _, ep := range endpoints { |
||||
if err := e.cli.Deregister(ctx, ep.AppID, ep.InstanceID); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (e *API) Subscribe(serverName string, fn func()) error { |
||||
e.lock.Lock() |
||||
defer e.lock.Unlock() |
||||
appID := e.ToAppID(serverName) |
||||
e.subscribers[appID] = &subscriber{ |
||||
appID: appID, |
||||
callBack: fn, |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (e *API) GetService(ctx context.Context, serverName string) []Instance { |
||||
appID := e.ToAppID(serverName) |
||||
if ins, ok := e.allInstances[appID]; ok { |
||||
return ins |
||||
} |
||||
|
||||
// 如果不再allinstances 中可以尝试再单独获取一次
|
||||
return e.cli.FetchAppUpInstances(ctx, appID) |
||||
} |
||||
|
||||
func (e *API) Unsubscribe(serverName string) { |
||||
e.lock.Lock() |
||||
defer e.lock.Unlock() |
||||
delete(e.subscribers, e.ToAppID(serverName)) |
||||
} |
||||
|
||||
func (e *API) ToAppID(serverName string) string { |
||||
return strings.ToUpper(serverName) |
||||
} |
@ -0,0 +1,7 @@ |
||||
module github.com/go-kratos/kratos/contrib/registry/eureka/v2 |
||||
|
||||
go 1.16 |
||||
|
||||
require github.com/go-kratos/kratos/v2 v2.1.5 |
||||
|
||||
replace github.com/go-kratos/kratos/v2 => ../../../ |
@ -0,0 +1,154 @@ |
||||
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= |
||||
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= |
||||
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= |
||||
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= |
||||
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= |
||||
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= |
||||
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= |
||||
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= |
||||
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= |
||||
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= |
||||
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= |
||||
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= |
||||
github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= |
||||
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= |
||||
github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= |
||||
github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= |
||||
github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= |
||||
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= |
||||
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= |
||||
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= |
||||
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= |
||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= |
||||
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= |
||||
github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= |
||||
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= |
||||
github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= |
||||
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= |
||||
github.com/go-kratos/aegis v0.1.1/go.mod h1:jYeSQ3Gesba478zEnujOiG5QdsyF3Xk/8owFUeKcHxw= |
||||
github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= |
||||
github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= |
||||
github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI= |
||||
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= |
||||
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= |
||||
github.com/go-playground/form/v4 v4.2.0/go.mod h1:q1a2BY+AQUUzhl6xA/6hBetay6dEIhMHjgvJiGo6K7U= |
||||
github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= |
||||
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= |
||||
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= |
||||
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= |
||||
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= |
||||
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= |
||||
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= |
||||
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= |
||||
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= |
||||
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= |
||||
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= |
||||
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= |
||||
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= |
||||
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= |
||||
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= |
||||
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= |
||||
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= |
||||
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= |
||||
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= |
||||
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= |
||||
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= |
||||
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= |
||||
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= |
||||
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= |
||||
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= |
||||
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= |
||||
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= |
||||
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= |
||||
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= |
||||
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= |
||||
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= |
||||
github.com/shirou/gopsutil/v3 v3.21.8/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ= |
||||
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= |
||||
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= |
||||
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= |
||||
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= |
||||
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= |
||||
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8= |
||||
go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs= |
||||
go.opentelemetry.io/otel/sdk v1.3.0/go.mod h1:rIo4suHNhQwBIPg9axF8V9CA72Wz2mKF1teNrup8yzs= |
||||
go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk= |
||||
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= |
||||
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= |
||||
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= |
||||
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= |
||||
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= |
||||
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= |
||||
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= |
||||
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= |
||||
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= |
||||
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= |
||||
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= |
||||
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= |
||||
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= |
||||
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= |
||||
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= |
||||
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= |
||||
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= |
||||
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= |
||||
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
||||
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= |
||||
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= |
||||
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= |
||||
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= |
||||
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= |
||||
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= |
||||
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= |
||||
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= |
||||
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= |
||||
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
||||
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= |
||||
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= |
||||
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= |
||||
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= |
||||
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
||||
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= |
||||
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= |
||||
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= |
||||
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= |
||||
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= |
||||
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= |
||||
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= |
||||
google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= |
||||
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= |
||||
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= |
||||
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= |
||||
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= |
||||
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= |
||||
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= |
||||
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= |
||||
google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= |
||||
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= |
||||
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= |
||||
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= |
||||
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= |
||||
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= |
||||
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= |
||||
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= |
||||
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= |
||||
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= |
||||
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= |
||||
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= |
||||
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= |
||||
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= |
||||
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= |
||||
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= |
||||
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= |
||||
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
||||
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= |
||||
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= |
||||
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= |
@ -0,0 +1,146 @@ |
||||
package eureka |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"strconv" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/go-kratos/kratos/v2/registry" |
||||
) |
||||
|
||||
var ( |
||||
_ registry.Registrar = &Registry{} |
||||
_ registry.Discovery = &Registry{} |
||||
) |
||||
|
||||
type Option func(o *Registry) |
||||
|
||||
// WithContext with registry context.
|
||||
func WithContext(ctx context.Context) Option { |
||||
return func(o *Registry) { o.ctx = ctx } |
||||
} |
||||
|
||||
func WithHeartbeat(interval time.Duration) Option { |
||||
return func(o *Registry) { o.heartbeatInterval = interval } |
||||
} |
||||
|
||||
func WithRefresh(interval time.Duration) Option { |
||||
return func(o *Registry) { o.refreshInterval = interval } |
||||
} |
||||
|
||||
func WithEurekaPath(path string) Option { |
||||
return func(o *Registry) { o.eurekaPath = path } |
||||
} |
||||
|
||||
type Registry struct { |
||||
ctx context.Context |
||||
api *API |
||||
heartbeatInterval time.Duration |
||||
refreshInterval time.Duration |
||||
eurekaPath string |
||||
} |
||||
|
||||
func New(eurekaUrls []string, opts ...Option) (*Registry, error) { |
||||
r := &Registry{ |
||||
ctx: context.Background(), |
||||
heartbeatInterval: heartbeatTime, |
||||
refreshInterval: refreshTime, |
||||
eurekaPath: "eureka/v2", |
||||
} |
||||
|
||||
for _, o := range opts { |
||||
o(r) |
||||
} |
||||
|
||||
client := NewClient(eurekaUrls, WithHeartbeatInterval(r.heartbeatInterval), WithClientContext(r.ctx), WithNamespace(r.eurekaPath)) |
||||
r.api = NewAPI(r.ctx, client, r.refreshInterval) |
||||
return r, nil |
||||
} |
||||
|
||||
// 这里的Context是每个注册器独享的
|
||||
func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error { |
||||
return r.api.Register(ctx, service.Name, r.Endpoints(service)...) |
||||
} |
||||
|
||||
// Deregister registry service to zookeeper.
|
||||
func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error { |
||||
return r.api.Deregister(ctx, r.Endpoints(service)) |
||||
} |
||||
|
||||
// GetService get services from zookeeper
|
||||
func (r *Registry) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) { |
||||
instances := r.api.GetService(ctx, serviceName) |
||||
items := make([]*registry.ServiceInstance, 0, len(instances)) |
||||
for _, instance := range instances { |
||||
items = append(items, ®istry.ServiceInstance{ |
||||
ID: instance.Metadata["ID"], |
||||
Name: instance.Metadata["Name"], |
||||
Version: instance.Metadata["Version"], |
||||
Endpoints: []string{instance.Metadata["Endpoints"]}, |
||||
Metadata: instance.Metadata, |
||||
}) |
||||
} |
||||
|
||||
return items, nil |
||||
} |
||||
|
||||
// watch 是独立的ctx
|
||||
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) { |
||||
return newWatch(ctx, r.api, serviceName) |
||||
} |
||||
|
||||
func (r *Registry) Endpoints(service *registry.ServiceInstance) []Endpoint { |
||||
var ( |
||||
res = []Endpoint{} |
||||
start int |
||||
) |
||||
for _, ep := range service.Endpoints { |
||||
start = strings.Index(ep, "//") |
||||
end := strings.LastIndex(ep, ":") |
||||
appID := strings.ToUpper(service.Name) |
||||
ip := ep[start+2 : end] |
||||
sport := ep[end+1:] |
||||
port, _ := strconv.Atoi(sport) |
||||
securePort := 443 |
||||
homePageURL := fmt.Sprintf("%s/", ep) |
||||
statusPageURL := fmt.Sprintf("%s/info", ep) |
||||
healthCheckURL := fmt.Sprintf("%s/health", ep) |
||||
instanceID := strings.Join([]string{ip, appID, sport}, ":") |
||||
metadata := make(map[string]string) |
||||
if len(service.Metadata) > 0 { |
||||
metadata = service.Metadata |
||||
} |
||||
if s, ok := service.Metadata["securePort"]; ok { |
||||
securePort, _ = strconv.Atoi(s) |
||||
} |
||||
if s, ok := service.Metadata["homePageURL"]; ok { |
||||
homePageURL = s |
||||
} |
||||
if s, ok := service.Metadata["statusPageURL"]; ok { |
||||
statusPageURL = s |
||||
} |
||||
if s, ok := service.Metadata["healthCheckURL"]; ok { |
||||
healthCheckURL = s |
||||
} |
||||
metadata["ID"] = service.ID |
||||
metadata["Name"] = service.Name |
||||
metadata["Version"] = service.Version |
||||
metadata["Endpoints"] = ep |
||||
metadata["agent"] = "go-eureka-client" |
||||
res = append(res, Endpoint{ |
||||
AppID: appID, |
||||
IP: ip, |
||||
Port: port, |
||||
SecurePort: securePort, |
||||
HomePageURL: homePageURL, |
||||
StatusPageURL: statusPageURL, |
||||
HealthCheckURL: healthCheckURL, |
||||
InstanceID: instanceID, |
||||
MetaData: metadata, |
||||
}) |
||||
} |
||||
|
||||
return res |
||||
} |
@ -0,0 +1,114 @@ |
||||
package eureka |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"log" |
||||
"sync" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/go-kratos/kratos/v2/registry" |
||||
) |
||||
|
||||
func TestRegistry(t *testing.T) { |
||||
ctx := context.Background() |
||||
ctx, cancel := context.WithCancel(ctx) |
||||
s1 := ®istry.ServiceInstance{ |
||||
ID: "0", |
||||
Name: "helloworld", |
||||
Endpoints: []string{"http://127.0.0.1:1111"}, |
||||
} |
||||
s2 := ®istry.ServiceInstance{ |
||||
ID: "0", |
||||
Name: "helloworld2", |
||||
Endpoints: []string{"http://127.0.0.1:222"}, |
||||
} |
||||
|
||||
r, _ := New([]string{"https://127.0.0.1:18761"}, WithContext(ctx), WithHeartbeat(time.Second), WithRefresh(time.Second), WithEurekaPath("eureka")) |
||||
|
||||
go do(r, s1) |
||||
go do(r, s2) |
||||
|
||||
time.Sleep(time.Second * 20) |
||||
cancel() |
||||
time.Sleep(time.Second * 1) |
||||
} |
||||
|
||||
func do(r *Registry, s *registry.ServiceInstance) { |
||||
w, err := r.Watch(context.Background(), s.Name) |
||||
if err != nil { |
||||
log.Fatal(err) |
||||
} |
||||
defer func() { |
||||
_ = w.Stop() |
||||
}() |
||||
go func() { |
||||
for { |
||||
res, nextErr := w.Next() |
||||
if nextErr != nil { |
||||
return |
||||
} |
||||
log.Printf("watch: %d", len(res)) |
||||
for _, r := range res { |
||||
log.Printf("next: %+v", r) |
||||
} |
||||
} |
||||
}() |
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
if err = r.Register(ctx, s); err != nil { |
||||
log.Fatal(err) |
||||
} |
||||
|
||||
time.Sleep(time.Second * 10) |
||||
res, err := r.GetService(ctx, s.Name) |
||||
if err != nil { |
||||
log.Fatal(err) |
||||
} |
||||
for i, re := range res { |
||||
log.Printf("first %d re:%v\n", i, re) |
||||
} |
||||
|
||||
if len(res) != 1 && res[0].Name != s.Name { |
||||
log.Fatalf("not expected: %+v", res) |
||||
} |
||||
|
||||
if err = r.Deregister(ctx, s); err != nil { |
||||
log.Fatal(err) |
||||
} |
||||
cancel() |
||||
time.Sleep(time.Second * 10) |
||||
|
||||
res, err = r.GetService(ctx, s.Name) |
||||
if err != nil { |
||||
log.Fatal(err) |
||||
} |
||||
for i, re := range res { |
||||
log.Printf("second %d re:%v\n", i, re) |
||||
} |
||||
if len(res) != 0 { |
||||
log.Fatalf("not expected empty") |
||||
} |
||||
} |
||||
|
||||
func TestLock(t *testing.T) { |
||||
type me struct { |
||||
lock sync.Mutex |
||||
} |
||||
|
||||
a := &me{} |
||||
go func() { |
||||
defer a.lock.Unlock() |
||||
a.lock.Lock() |
||||
fmt.Println("This is fmt first.") |
||||
time.Sleep(time.Second * 5) |
||||
}() |
||||
go func() { |
||||
defer a.lock.Unlock() |
||||
a.lock.Lock() |
||||
fmt.Println("This is fmt second.") |
||||
time.Sleep(time.Second * 5) |
||||
}() |
||||
time.Sleep(time.Second * 10) |
||||
} |
@ -0,0 +1,60 @@ |
||||
package eureka |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/go-kratos/kratos/v2/registry" |
||||
) |
||||
|
||||
var _ registry.Watcher = &watcher{} |
||||
|
||||
type watcher struct { |
||||
ctx context.Context |
||||
cancel context.CancelFunc |
||||
cli *API |
||||
watchChan chan struct{} |
||||
serverName string |
||||
} |
||||
|
||||
func newWatch(ctx context.Context, cli *API, serverName string) (*watcher, error) { |
||||
w := &watcher{ |
||||
ctx: ctx, |
||||
cli: cli, |
||||
serverName: serverName, |
||||
watchChan: make(chan struct{}, 1), |
||||
} |
||||
w.ctx, w.cancel = context.WithCancel(ctx) |
||||
e := w.cli.Subscribe( |
||||
serverName, |
||||
func() { |
||||
w.watchChan <- struct{}{} |
||||
}, |
||||
) |
||||
return w, e |
||||
} |
||||
|
||||
func (w watcher) Next() (services []*registry.ServiceInstance, err error) { |
||||
select { |
||||
case <-w.ctx.Done(): |
||||
return nil, w.ctx.Err() |
||||
case <-w.watchChan: |
||||
instances := w.cli.GetService(w.ctx, w.serverName) |
||||
services = make([]*registry.ServiceInstance, 0, len(instances)) |
||||
for _, instance := range instances { |
||||
services = append(services, ®istry.ServiceInstance{ |
||||
ID: instance.Metadata["ID"], |
||||
Name: instance.Metadata["Name"], |
||||
Version: instance.Metadata["Version"], |
||||
Endpoints: []string{instance.Metadata["Endpoints"]}, |
||||
Metadata: instance.Metadata, |
||||
}) |
||||
} |
||||
return |
||||
} |
||||
} |
||||
|
||||
func (w *watcher) Stop() error { |
||||
w.cancel() |
||||
w.cli.Unsubscribe(w.serverName) |
||||
return nil |
||||
} |
Loading…
Reference in new issue