From db933457338beb8724dd55d27f8b2ca5e56bc695 Mon Sep 17 00:00:00 2001 From: weetime <351075478@qq.com> Date: Tue, 29 Mar 2022 14:25:05 +0800 Subject: [PATCH] feat(contrib): add eureka registry (#1792) (#1793) * - feat(contrib): add eureka registry (#1792) Co-authored-by: fangyong --- contrib/registry/eureka/client.go | 347 +++++++++++++++++++++++ contrib/registry/eureka/eureka.go | 136 +++++++++ contrib/registry/eureka/go.mod | 7 + contrib/registry/eureka/go.sum | 154 ++++++++++ contrib/registry/eureka/register.go | 146 ++++++++++ contrib/registry/eureka/register_test.go | 114 ++++++++ contrib/registry/eureka/watcher.go | 60 ++++ hack/.test_ignored_files | 1 + 8 files changed, 965 insertions(+) create mode 100644 contrib/registry/eureka/client.go create mode 100644 contrib/registry/eureka/eureka.go create mode 100644 contrib/registry/eureka/go.mod create mode 100644 contrib/registry/eureka/go.sum create mode 100644 contrib/registry/eureka/register.go create mode 100644 contrib/registry/eureka/register_test.go create mode 100644 contrib/registry/eureka/watcher.go diff --git a/contrib/registry/eureka/client.go b/contrib/registry/eureka/client.go new file mode 100644 index 000000000..5895a0576 --- /dev/null +++ b/contrib/registry/eureka/client.go @@ -0,0 +1,347 @@ +package eureka + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "io/ioutil" + "math/rand" + "net/http" + "strings" + "sync" + "time" +) + +const ( + statusUp = "UP" + statusDown = "DOWN" + statusOutOfServeice = "OUT_OF_SERVICE" + heartbeatRetry = 3 + maxIdleConns = 100 + heartbeatTime = 10 + httpTimeout = 3 + refreshTime = 30 +) + +type Endpoint struct { + InstanceID string + IP string + AppID string + Port int + SecurePort int + HomePageURL string + StatusPageURL string + HealthCheckURL string + MetaData map[string]string +} + +// Response for /eureka/apps +type ApplicationsRootResponse struct { + ApplicationsResponse `json:"applications"` +} + +type ApplicationsResponse struct { + Version string `json:"versions__delta"` + AppsHashcode string `json:"apps__hashcode"` + Applications []Application `json:"application"` +} + +type Application struct { + Name string `json:"name"` + Instance []Instance `json:"instance"` +} + +type RequestInstance struct { + Instance Instance `json:"instance"` +} + +type Instance struct { + InstanceID string `json:"instanceId"` + HostName string `json:"hostName"` + Port Port `json:"port"` + App string `json:"app"` + IPAddr string `json:"ipAddr"` + VipAddress string `json:"vipAddress"` + Status string `json:"status"` + SecurePort Port `json:"securePort"` + HomePageURL string `json:"homePageUrl"` + StatusPageURL string `json:"statusPageUrl"` + HealthCheckURL string `json:"healthCheckUrl"` + DataCenterInfo DataCenterInfo `json:"dataCenterInfo"` + Metadata map[string]string `json:"metadata"` +} + +type Port struct { + Port int `json:"$"` + Enabled string `json:"@enabled"` +} + +type DataCenterInfo struct { + Name string `json:"name"` + Class string `json:"@class"` +} + +var _ APIInterface = new(Client) + +type APIInterface interface { + Register(ctx context.Context, ep Endpoint) error + Deregister(ctx context.Context, appID, instanceID string) error + Heartbeat(ep Endpoint) + FetchApps(ctx context.Context) []Application + FetchAllUpInstances(ctx context.Context) []Instance + FetchAppInstances(ctx context.Context, appID string) (m Application, err error) + FetchAppUpInstances(ctx context.Context, appID string) []Instance + FetchAppInstance(ctx context.Context, appID string, instanceID string) (m Instance, err error) + FetchInstance(ctx context.Context, instanceID string) (m Instance, err error) + Out(ctx context.Context, appID, instanceID string) error + Down(ctx context.Context, appID, instanceID string) error +} + +type ClientOption func(e *Client) + +func WithMaxRetry(maxRetry int) ClientOption { + return func(e *Client) { e.maxRetry = maxRetry } +} + +func WithHeartbeatInterval(interval time.Duration) ClientOption { + return func(e *Client) { + e.heartbeatInterval = interval + } +} + +func WithClientContext(ctx context.Context) ClientOption { + return func(e *Client) { e.ctx = ctx } +} + +func WithNamespace(path string) ClientOption { + return func(e *Client) { e.eurekaPath = path } +} + +type Client struct { + ctx context.Context + urls []string + eurekaPath string + maxRetry int + heartbeatInterval time.Duration + client *http.Client + keepalive map[string]chan struct{} + lock sync.Mutex +} + +func NewClient(urls []string, opts ...ClientOption) *Client { + tr := &http.Transport{ + MaxIdleConns: maxIdleConns, + } + + e := &Client{ + ctx: context.Background(), + urls: urls, + eurekaPath: "eureka/v2", + maxRetry: len(urls), + heartbeatInterval: time.Second * heartbeatTime, + client: &http.Client{Transport: tr, Timeout: time.Second * httpTimeout}, + keepalive: make(map[string]chan struct{}), + } + + for _, o := range opts { + o(e) + } + + return e +} + +func (e *Client) FetchApps(ctx context.Context) []Application { + var m ApplicationsRootResponse + if err := e.do(ctx, "GET", []string{"apps"}, nil, &m); err != nil { + return nil + } + + return m.Applications +} + +func (e *Client) FetchAppInstances(ctx context.Context, appID string) (m Application, err error) { + err = e.do(ctx, "GET", []string{"apps", appID}, nil, &m) + return +} + +func (e *Client) FetchAppUpInstances(ctx context.Context, appID string) []Instance { + app, err := e.FetchAppInstances(ctx, appID) + if err != nil { + return nil + } + return e.filterUp(app) +} + +func (e *Client) FetchAppInstance(ctx context.Context, appID string, instanceID string) (m Instance, err error) { + err = e.do(ctx, "GET", []string{"apps", appID, instanceID}, nil, &m) + return +} + +func (e *Client) FetchInstance(ctx context.Context, instanceID string) (m Instance, err error) { + err = e.do(ctx, "GET", []string{"instances", instanceID}, nil, &m) + return +} + +func (e *Client) Out(ctx context.Context, appID, instanceID string) error { + return e.do(ctx, "PUT", []string{"apps", appID, instanceID, fmt.Sprintf("status?value=%s", statusOutOfServeice)}, nil, nil) +} + +func (e *Client) Down(ctx context.Context, appID, instanceID string) error { + return e.do(ctx, "PUT", []string{"apps", appID, instanceID, fmt.Sprintf("status?value=%s", statusDown)}, nil, nil) +} + +func (e *Client) FetchAllUpInstances(ctx context.Context) []Instance { + return e.filterUp(e.FetchApps(ctx)...) +} + +func (e *Client) Register(ctx context.Context, ep Endpoint) error { + return e.registerEndpoint(ctx, ep) +} + +func (e *Client) Deregister(ctx context.Context, appID, instanceID string) error { + if err := e.do(ctx, "DELETE", []string{"apps", appID, instanceID}, nil, nil); err != nil { + return err + } + go e.cancelHeartbeat(appID) + return nil +} + +func (e *Client) registerEndpoint(ctx context.Context, ep Endpoint) error { + instance := RequestInstance{ + Instance: Instance{ + InstanceID: ep.InstanceID, + HostName: ep.AppID, + Port: Port{ + Port: ep.Port, + Enabled: "true", + }, + App: ep.AppID, + IPAddr: ep.IP, + VipAddress: ep.AppID, + Status: statusUp, + SecurePort: Port{ + Port: ep.SecurePort, + Enabled: "false", + }, + HomePageURL: ep.HomePageURL, + StatusPageURL: ep.StatusPageURL, + HealthCheckURL: ep.HealthCheckURL, + DataCenterInfo: DataCenterInfo{ + Name: "MyOwn", + Class: "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo", + }, + Metadata: ep.MetaData, + }, + } + + body, err := json.Marshal(instance) + if err != nil { + return err + } + return e.do(ctx, "POST", []string{"apps", ep.AppID}, bytes.NewReader(body), nil) +} + +func (e *Client) Heartbeat(ep Endpoint) { + e.lock.Lock() + e.keepalive[ep.AppID] = make(chan struct{}) + e.lock.Unlock() + + ticker := time.NewTicker(e.heartbeatInterval) + defer ticker.Stop() + retryCount := 0 + for { + select { + case <-e.ctx.Done(): + return + case <-e.keepalive[ep.AppID]: + return + case <-ticker.C: + if err := e.do(e.ctx, "PUT", []string{"apps", ep.AppID, ep.InstanceID}, nil, nil); err != nil { + if retryCount++; retryCount > heartbeatRetry { + _ = e.registerEndpoint(e.ctx, ep) + retryCount = 0 + } + } + } + } +} + +func (e *Client) cancelHeartbeat(appID string) { + defer e.lock.Unlock() + e.lock.Lock() + if ch, ok := e.keepalive[appID]; ok { + ch <- struct{}{} + } +} + +func (e *Client) filterUp(apps ...Application) (res []Instance) { + for _, app := range apps { + for _, ins := range app.Instance { + if ins.Status == statusUp { + res = append(res, ins) + } + } + } + + return +} + +func (e *Client) pickServer(currentTimes int) string { + return e.urls[currentTimes%e.maxRetry] +} + +func (e *Client) shuffle() { + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(e.urls), func(i, j int) { + e.urls[i], e.urls[j] = e.urls[j], e.urls[i] + }) +} + +func (e *Client) buildAPI(currentTimes int, params ...string) string { + if currentTimes == 0 { + e.shuffle() + } + server := e.pickServer(currentTimes) + params = append([]string{server, e.eurekaPath}, params...) + return strings.Join(params, "/") +} + +func (e *Client) do(ctx context.Context, method string, params []string, input io.Reader, output interface{}) error { + for i := 0; i < e.maxRetry; i++ { + request, err := http.NewRequest(method, e.buildAPI(i, params...), input) + if err != nil { + return err + } + request = request.WithContext(ctx) + request.Header.Add("User-Agent", "go-eureka-client") + request.Header.Add("Accept", "application/json;charset=UTF-8") + request.Header.Add("Content-Type", "application/json;charset=UTF-8") + resp, err := e.client.Do(request) + if err != nil { + continue + } + defer func() { + _, _ = io.Copy(ioutil.Discard, resp.Body) + resp.Body.Close() + }() + + if output != nil && resp.StatusCode/100 == 2 { + data, err := ioutil.ReadAll(resp.Body) + if err != nil { + return err + } + if err = json.Unmarshal(data, output); err != nil { + return err + } + } + + if resp.StatusCode >= http.StatusBadRequest { + return fmt.Errorf("response Error %d", resp.StatusCode) + } + + return nil + } + return fmt.Errorf("retry after %d times", e.maxRetry) +} diff --git a/contrib/registry/eureka/eureka.go b/contrib/registry/eureka/eureka.go new file mode 100644 index 000000000..ee82d850a --- /dev/null +++ b/contrib/registry/eureka/eureka.go @@ -0,0 +1,136 @@ +package eureka + +import ( + "context" + "strings" + "sync" + "time" +) + +type subscriber struct { + appID string + callBack func() +} + +type API struct { + cli *Client + allInstances map[string][]Instance + subscribers map[string]*subscriber + refreshInterval time.Duration + lock sync.Mutex +} + +func NewAPI(ctx context.Context, client *Client, refreshInterval time.Duration) *API { + e := &API{ + cli: client, + allInstances: make(map[string][]Instance), + subscribers: make(map[string]*subscriber), + refreshInterval: refreshInterval, + } + + // 首次广播一次 + e.broadcast() + + go e.refresh(ctx) + + return e +} + +func (e *API) refresh(ctx context.Context) { + ticker := time.NewTicker(e.refreshInterval) + defer ticker.Stop() + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + e.broadcast() + } + } +} + +func (e *API) broadcast() { + instances := e.cacheAllInstances() + if instances == nil { + return + } + + for _, subscriber := range e.subscribers { + go subscriber.callBack() + } + defer e.lock.Unlock() + e.lock.Lock() + e.allInstances = instances +} + +func (e *API) cacheAllInstances() map[string][]Instance { + items := make(map[string][]Instance) + instances := e.cli.FetchAllUpInstances(context.Background()) + for _, instance := range instances { + items[e.ToAppID(instance.App)] = append(items[instance.App], instance) + } + + return items +} + +func (e *API) Register(ctx context.Context, serviceName string, endpoints ...Endpoint) error { + appID := e.ToAppID(serviceName) + upInstances := make(map[string]struct{}) + + for _, ins := range e.GetService(ctx, appID) { + upInstances[ins.InstanceID] = struct{}{} + } + + for _, ep := range endpoints { + if _, ok := upInstances[ep.InstanceID]; !ok { + if err := e.cli.Register(ctx, ep); err != nil { + return err + } + go e.cli.Heartbeat(ep) + } + } + + return nil +} + +// Deregister 中的ctx 和 register ctx 是同一个 +func (e *API) Deregister(ctx context.Context, endpoints []Endpoint) error { + for _, ep := range endpoints { + if err := e.cli.Deregister(ctx, ep.AppID, ep.InstanceID); err != nil { + return err + } + } + + return nil +} + +func (e *API) Subscribe(serverName string, fn func()) error { + e.lock.Lock() + defer e.lock.Unlock() + appID := e.ToAppID(serverName) + e.subscribers[appID] = &subscriber{ + appID: appID, + callBack: fn, + } + return nil +} + +func (e *API) GetService(ctx context.Context, serverName string) []Instance { + appID := e.ToAppID(serverName) + if ins, ok := e.allInstances[appID]; ok { + return ins + } + + // 如果不再allinstances 中可以尝试再单独获取一次 + return e.cli.FetchAppUpInstances(ctx, appID) +} + +func (e *API) Unsubscribe(serverName string) { + e.lock.Lock() + defer e.lock.Unlock() + delete(e.subscribers, e.ToAppID(serverName)) +} + +func (e *API) ToAppID(serverName string) string { + return strings.ToUpper(serverName) +} diff --git a/contrib/registry/eureka/go.mod b/contrib/registry/eureka/go.mod new file mode 100644 index 000000000..6123fff15 --- /dev/null +++ b/contrib/registry/eureka/go.mod @@ -0,0 +1,7 @@ +module github.com/go-kratos/kratos/contrib/registry/eureka/v2 + +go 1.16 + +require github.com/go-kratos/kratos/v2 v2.1.5 + +replace github.com/go-kratos/kratos/v2 => ../../../ diff --git a/contrib/registry/eureka/go.sum b/contrib/registry/eureka/go.sum new file mode 100644 index 000000000..cdca5fd91 --- /dev/null +++ b/contrib/registry/eureka/go.sum @@ -0,0 +1,154 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= +github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8= +github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= +github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= +github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= +github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= +github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk= +github.com/cncf/udpa/go v0.0.0-20210930031921-04548b0d99d4/go.mod h1:6pvJx4me5XPnfI9Z40ddWsdw2W/uZgQLFXToKeRcDiI= +github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210805033703-aa0b78936158/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20210922020428-25de7278fc84/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/cncf/xds/go v0.0.0-20211011173535-cb28da3451f1/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs= +github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= +github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= +github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk= +github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ= +github.com/envoyproxy/go-control-plane v0.9.10-0.20210907150352-cf90f659a021/go.mod h1:AFq3mo9L8Lqqiid3OhADV3RfLJnjiw63cSpi+fDTRC0= +github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/fsnotify/fsnotify v1.5.1/go.mod h1:T3375wBYaZdLLcVNkcVbzGHY7f1l/uK5T5Ai1i3InKU= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-kratos/aegis v0.1.1/go.mod h1:jYeSQ3Gesba478zEnujOiG5QdsyF3Xk/8owFUeKcHxw= +github.com/go-logr/logr v1.2.0/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.2.1/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/stdr v1.2.0/go.mod h1:YkVgnZu1ZjjL7xTxrfm/LLZBfkhTqSR1ydtm6jTKKwI= +github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= +github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= +github.com/go-playground/form/v4 v4.2.0/go.mod h1:q1a2BY+AQUUzhl6xA/6hBetay6dEIhMHjgvJiGo6K7U= +github.com/golang-jwt/jwt/v4 v4.2.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw= +github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8= +github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA= +github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs= +github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w= +github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0= +github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8= +github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI= +github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk= +github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= +github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M= +github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU= +github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw= +github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA= +github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= +github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= +github.com/shirou/gopsutil/v3 v3.21.8/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ= +github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= +github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= +github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8= +go.opentelemetry.io/otel v1.3.0/go.mod h1:PWIKzi6JCp7sM0k9yZ43VX+T345uNbAkDKwHVjb2PTs= +go.opentelemetry.io/otel/sdk v1.3.0/go.mod h1:rIo4suHNhQwBIPg9axF8V9CA72Wz2mKF1teNrup8yzs= +go.opentelemetry.io/otel/trace v1.3.0/go.mod h1:c/VDhno8888bvQYmbYLqe41/Ldmr/KKunbvWM4/fEjk= +go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= +golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU= +golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc= +golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY= +golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs= +golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc= +google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c= +google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo= +google.golang.org/genproto v0.0.0-20220126215142-9970aeb2e350/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc= +google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c= +google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg= +google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY= +google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk= +google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0= +google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU= +google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34= +google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ590SU= +google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8= +google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0= +google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM= +google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE= +google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= +google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c= +google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw= +google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/contrib/registry/eureka/register.go b/contrib/registry/eureka/register.go new file mode 100644 index 000000000..31727242b --- /dev/null +++ b/contrib/registry/eureka/register.go @@ -0,0 +1,146 @@ +package eureka + +import ( + "context" + "fmt" + "strconv" + "strings" + "time" + + "github.com/go-kratos/kratos/v2/registry" +) + +var ( + _ registry.Registrar = &Registry{} + _ registry.Discovery = &Registry{} +) + +type Option func(o *Registry) + +// WithContext with registry context. +func WithContext(ctx context.Context) Option { + return func(o *Registry) { o.ctx = ctx } +} + +func WithHeartbeat(interval time.Duration) Option { + return func(o *Registry) { o.heartbeatInterval = interval } +} + +func WithRefresh(interval time.Duration) Option { + return func(o *Registry) { o.refreshInterval = interval } +} + +func WithEurekaPath(path string) Option { + return func(o *Registry) { o.eurekaPath = path } +} + +type Registry struct { + ctx context.Context + api *API + heartbeatInterval time.Duration + refreshInterval time.Duration + eurekaPath string +} + +func New(eurekaUrls []string, opts ...Option) (*Registry, error) { + r := &Registry{ + ctx: context.Background(), + heartbeatInterval: heartbeatTime, + refreshInterval: refreshTime, + eurekaPath: "eureka/v2", + } + + for _, o := range opts { + o(r) + } + + client := NewClient(eurekaUrls, WithHeartbeatInterval(r.heartbeatInterval), WithClientContext(r.ctx), WithNamespace(r.eurekaPath)) + r.api = NewAPI(r.ctx, client, r.refreshInterval) + return r, nil +} + +// 这里的Context是每个注册器独享的 +func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error { + return r.api.Register(ctx, service.Name, r.Endpoints(service)...) +} + +// Deregister registry service to zookeeper. +func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error { + return r.api.Deregister(ctx, r.Endpoints(service)) +} + +// GetService get services from zookeeper +func (r *Registry) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) { + instances := r.api.GetService(ctx, serviceName) + items := make([]*registry.ServiceInstance, 0, len(instances)) + for _, instance := range instances { + items = append(items, ®istry.ServiceInstance{ + ID: instance.Metadata["ID"], + Name: instance.Metadata["Name"], + Version: instance.Metadata["Version"], + Endpoints: []string{instance.Metadata["Endpoints"]}, + Metadata: instance.Metadata, + }) + } + + return items, nil +} + +// watch 是独立的ctx +func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) { + return newWatch(ctx, r.api, serviceName) +} + +func (r *Registry) Endpoints(service *registry.ServiceInstance) []Endpoint { + var ( + res = []Endpoint{} + start int + ) + for _, ep := range service.Endpoints { + start = strings.Index(ep, "//") + end := strings.LastIndex(ep, ":") + appID := strings.ToUpper(service.Name) + ip := ep[start+2 : end] + sport := ep[end+1:] + port, _ := strconv.Atoi(sport) + securePort := 443 + homePageURL := fmt.Sprintf("%s/", ep) + statusPageURL := fmt.Sprintf("%s/info", ep) + healthCheckURL := fmt.Sprintf("%s/health", ep) + instanceID := strings.Join([]string{ip, appID, sport}, ":") + metadata := make(map[string]string) + if len(service.Metadata) > 0 { + metadata = service.Metadata + } + if s, ok := service.Metadata["securePort"]; ok { + securePort, _ = strconv.Atoi(s) + } + if s, ok := service.Metadata["homePageURL"]; ok { + homePageURL = s + } + if s, ok := service.Metadata["statusPageURL"]; ok { + statusPageURL = s + } + if s, ok := service.Metadata["healthCheckURL"]; ok { + healthCheckURL = s + } + metadata["ID"] = service.ID + metadata["Name"] = service.Name + metadata["Version"] = service.Version + metadata["Endpoints"] = ep + metadata["agent"] = "go-eureka-client" + res = append(res, Endpoint{ + AppID: appID, + IP: ip, + Port: port, + SecurePort: securePort, + HomePageURL: homePageURL, + StatusPageURL: statusPageURL, + HealthCheckURL: healthCheckURL, + InstanceID: instanceID, + MetaData: metadata, + }) + } + + return res +} diff --git a/contrib/registry/eureka/register_test.go b/contrib/registry/eureka/register_test.go new file mode 100644 index 000000000..683b0c587 --- /dev/null +++ b/contrib/registry/eureka/register_test.go @@ -0,0 +1,114 @@ +package eureka + +import ( + "context" + "fmt" + "log" + "sync" + "testing" + "time" + + "github.com/go-kratos/kratos/v2/registry" +) + +func TestRegistry(t *testing.T) { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + s1 := ®istry.ServiceInstance{ + ID: "0", + Name: "helloworld", + Endpoints: []string{"http://127.0.0.1:1111"}, + } + s2 := ®istry.ServiceInstance{ + ID: "0", + Name: "helloworld2", + Endpoints: []string{"http://127.0.0.1:222"}, + } + + r, _ := New([]string{"https://127.0.0.1:18761"}, WithContext(ctx), WithHeartbeat(time.Second), WithRefresh(time.Second), WithEurekaPath("eureka")) + + go do(r, s1) + go do(r, s2) + + time.Sleep(time.Second * 20) + cancel() + time.Sleep(time.Second * 1) +} + +func do(r *Registry, s *registry.ServiceInstance) { + w, err := r.Watch(context.Background(), s.Name) + if err != nil { + log.Fatal(err) + } + defer func() { + _ = w.Stop() + }() + go func() { + for { + res, nextErr := w.Next() + if nextErr != nil { + return + } + log.Printf("watch: %d", len(res)) + for _, r := range res { + log.Printf("next: %+v", r) + } + } + }() + + ctx, cancel := context.WithCancel(context.Background()) + if err = r.Register(ctx, s); err != nil { + log.Fatal(err) + } + + time.Sleep(time.Second * 10) + res, err := r.GetService(ctx, s.Name) + if err != nil { + log.Fatal(err) + } + for i, re := range res { + log.Printf("first %d re:%v\n", i, re) + } + + if len(res) != 1 && res[0].Name != s.Name { + log.Fatalf("not expected: %+v", res) + } + + if err = r.Deregister(ctx, s); err != nil { + log.Fatal(err) + } + cancel() + time.Sleep(time.Second * 10) + + res, err = r.GetService(ctx, s.Name) + if err != nil { + log.Fatal(err) + } + for i, re := range res { + log.Printf("second %d re:%v\n", i, re) + } + if len(res) != 0 { + log.Fatalf("not expected empty") + } +} + +func TestLock(t *testing.T) { + type me struct { + lock sync.Mutex + } + + a := &me{} + go func() { + defer a.lock.Unlock() + a.lock.Lock() + fmt.Println("This is fmt first.") + time.Sleep(time.Second * 5) + }() + go func() { + defer a.lock.Unlock() + a.lock.Lock() + fmt.Println("This is fmt second.") + time.Sleep(time.Second * 5) + }() + time.Sleep(time.Second * 10) +} diff --git a/contrib/registry/eureka/watcher.go b/contrib/registry/eureka/watcher.go new file mode 100644 index 000000000..468d97b24 --- /dev/null +++ b/contrib/registry/eureka/watcher.go @@ -0,0 +1,60 @@ +package eureka + +import ( + "context" + + "github.com/go-kratos/kratos/v2/registry" +) + +var _ registry.Watcher = &watcher{} + +type watcher struct { + ctx context.Context + cancel context.CancelFunc + cli *API + watchChan chan struct{} + serverName string +} + +func newWatch(ctx context.Context, cli *API, serverName string) (*watcher, error) { + w := &watcher{ + ctx: ctx, + cli: cli, + serverName: serverName, + watchChan: make(chan struct{}, 1), + } + w.ctx, w.cancel = context.WithCancel(ctx) + e := w.cli.Subscribe( + serverName, + func() { + w.watchChan <- struct{}{} + }, + ) + return w, e +} + +func (w watcher) Next() (services []*registry.ServiceInstance, err error) { + select { + case <-w.ctx.Done(): + return nil, w.ctx.Err() + case <-w.watchChan: + instances := w.cli.GetService(w.ctx, w.serverName) + services = make([]*registry.ServiceInstance, 0, len(instances)) + for _, instance := range instances { + services = append(services, ®istry.ServiceInstance{ + ID: instance.Metadata["ID"], + Name: instance.Metadata["Name"], + Version: instance.Metadata["Version"], + Endpoints: []string{instance.Metadata["Endpoints"]}, + Metadata: instance.Metadata, + }) + } + return + } +} + +func (w *watcher) Stop() error { + w.cancel() + w.cli.Unsubscribe(w.serverName) + return nil +} diff --git a/hack/.test_ignored_files b/hack/.test_ignored_files index d87870a64..52d736ef4 100644 --- a/hack/.test_ignored_files +++ b/hack/.test_ignored_files @@ -7,3 +7,4 @@ ./contrib/registry/zookeeper ./contrib/registry/etcd ./contrib/registry/consul +./contrib/registry/eureka