You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
347 lines
8.9 KiB
347 lines
8.9 KiB
package eureka
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"math/rand"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
const (
|
|
statusUp = "UP"
|
|
statusDown = "DOWN"
|
|
statusOutOfServeice = "OUT_OF_SERVICE"
|
|
heartbeatRetry = 3
|
|
maxIdleConns = 100
|
|
heartbeatTime = 10
|
|
httpTimeout = 3
|
|
refreshTime = 30
|
|
)
|
|
|
|
type Endpoint struct {
|
|
InstanceID string
|
|
IP string
|
|
AppID string
|
|
Port int
|
|
SecurePort int
|
|
HomePageURL string
|
|
StatusPageURL string
|
|
HealthCheckURL string
|
|
MetaData map[string]string
|
|
}
|
|
|
|
// Response for /eureka/apps
|
|
type ApplicationsRootResponse struct {
|
|
ApplicationsResponse `json:"applications"`
|
|
}
|
|
|
|
type ApplicationsResponse struct {
|
|
Version string `json:"versions__delta"`
|
|
AppsHashcode string `json:"apps__hashcode"`
|
|
Applications []Application `json:"application"`
|
|
}
|
|
|
|
type Application struct {
|
|
Name string `json:"name"`
|
|
Instance []Instance `json:"instance"`
|
|
}
|
|
|
|
type RequestInstance struct {
|
|
Instance Instance `json:"instance"`
|
|
}
|
|
|
|
type Instance struct {
|
|
InstanceID string `json:"instanceId"`
|
|
HostName string `json:"hostName"`
|
|
Port Port `json:"port"`
|
|
App string `json:"app"`
|
|
IPAddr string `json:"ipAddr"`
|
|
VipAddress string `json:"vipAddress"`
|
|
Status string `json:"status"`
|
|
SecurePort Port `json:"securePort"`
|
|
HomePageURL string `json:"homePageUrl"`
|
|
StatusPageURL string `json:"statusPageUrl"`
|
|
HealthCheckURL string `json:"healthCheckUrl"`
|
|
DataCenterInfo DataCenterInfo `json:"dataCenterInfo"`
|
|
Metadata map[string]string `json:"metadata"`
|
|
}
|
|
|
|
type Port struct {
|
|
Port int `json:"$"`
|
|
Enabled string `json:"@enabled"`
|
|
}
|
|
|
|
type DataCenterInfo struct {
|
|
Name string `json:"name"`
|
|
Class string `json:"@class"`
|
|
}
|
|
|
|
var _ APIInterface = new(Client)
|
|
|
|
type APIInterface interface {
|
|
Register(ctx context.Context, ep Endpoint) error
|
|
Deregister(ctx context.Context, appID, instanceID string) error
|
|
Heartbeat(ep Endpoint)
|
|
FetchApps(ctx context.Context) []Application
|
|
FetchAllUpInstances(ctx context.Context) []Instance
|
|
FetchAppInstances(ctx context.Context, appID string) (m Application, err error)
|
|
FetchAppUpInstances(ctx context.Context, appID string) []Instance
|
|
FetchAppInstance(ctx context.Context, appID string, instanceID string) (m Instance, err error)
|
|
FetchInstance(ctx context.Context, instanceID string) (m Instance, err error)
|
|
Out(ctx context.Context, appID, instanceID string) error
|
|
Down(ctx context.Context, appID, instanceID string) error
|
|
}
|
|
|
|
type ClientOption func(e *Client)
|
|
|
|
func WithMaxRetry(maxRetry int) ClientOption {
|
|
return func(e *Client) { e.maxRetry = maxRetry }
|
|
}
|
|
|
|
func WithHeartbeatInterval(interval time.Duration) ClientOption {
|
|
return func(e *Client) {
|
|
e.heartbeatInterval = interval
|
|
}
|
|
}
|
|
|
|
func WithClientContext(ctx context.Context) ClientOption {
|
|
return func(e *Client) { e.ctx = ctx }
|
|
}
|
|
|
|
func WithNamespace(path string) ClientOption {
|
|
return func(e *Client) { e.eurekaPath = path }
|
|
}
|
|
|
|
type Client struct {
|
|
ctx context.Context
|
|
urls []string
|
|
eurekaPath string
|
|
maxRetry int
|
|
heartbeatInterval time.Duration
|
|
client *http.Client
|
|
keepalive map[string]chan struct{}
|
|
lock sync.Mutex
|
|
}
|
|
|
|
func NewClient(urls []string, opts ...ClientOption) *Client {
|
|
tr := &http.Transport{
|
|
MaxIdleConns: maxIdleConns,
|
|
}
|
|
|
|
e := &Client{
|
|
ctx: context.Background(),
|
|
urls: urls,
|
|
eurekaPath: "eureka/v2",
|
|
maxRetry: len(urls),
|
|
heartbeatInterval: time.Second * heartbeatTime,
|
|
client: &http.Client{Transport: tr, Timeout: time.Second * httpTimeout},
|
|
keepalive: make(map[string]chan struct{}),
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(e)
|
|
}
|
|
|
|
return e
|
|
}
|
|
|
|
func (e *Client) FetchApps(ctx context.Context) []Application {
|
|
var m ApplicationsRootResponse
|
|
if err := e.do(ctx, "GET", []string{"apps"}, nil, &m); err != nil {
|
|
return nil
|
|
}
|
|
|
|
return m.Applications
|
|
}
|
|
|
|
func (e *Client) FetchAppInstances(ctx context.Context, appID string) (m Application, err error) {
|
|
err = e.do(ctx, "GET", []string{"apps", appID}, nil, &m)
|
|
return
|
|
}
|
|
|
|
func (e *Client) FetchAppUpInstances(ctx context.Context, appID string) []Instance {
|
|
app, err := e.FetchAppInstances(ctx, appID)
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return e.filterUp(app)
|
|
}
|
|
|
|
func (e *Client) FetchAppInstance(ctx context.Context, appID string, instanceID string) (m Instance, err error) {
|
|
err = e.do(ctx, "GET", []string{"apps", appID, instanceID}, nil, &m)
|
|
return
|
|
}
|
|
|
|
func (e *Client) FetchInstance(ctx context.Context, instanceID string) (m Instance, err error) {
|
|
err = e.do(ctx, "GET", []string{"instances", instanceID}, nil, &m)
|
|
return
|
|
}
|
|
|
|
func (e *Client) Out(ctx context.Context, appID, instanceID string) error {
|
|
return e.do(ctx, "PUT", []string{"apps", appID, instanceID, fmt.Sprintf("status?value=%s", statusOutOfServeice)}, nil, nil)
|
|
}
|
|
|
|
func (e *Client) Down(ctx context.Context, appID, instanceID string) error {
|
|
return e.do(ctx, "PUT", []string{"apps", appID, instanceID, fmt.Sprintf("status?value=%s", statusDown)}, nil, nil)
|
|
}
|
|
|
|
func (e *Client) FetchAllUpInstances(ctx context.Context) []Instance {
|
|
return e.filterUp(e.FetchApps(ctx)...)
|
|
}
|
|
|
|
func (e *Client) Register(ctx context.Context, ep Endpoint) error {
|
|
return e.registerEndpoint(ctx, ep)
|
|
}
|
|
|
|
func (e *Client) Deregister(ctx context.Context, appID, instanceID string) error {
|
|
if err := e.do(ctx, "DELETE", []string{"apps", appID, instanceID}, nil, nil); err != nil {
|
|
return err
|
|
}
|
|
go e.cancelHeartbeat(appID)
|
|
return nil
|
|
}
|
|
|
|
func (e *Client) registerEndpoint(ctx context.Context, ep Endpoint) error {
|
|
instance := RequestInstance{
|
|
Instance: Instance{
|
|
InstanceID: ep.InstanceID,
|
|
HostName: ep.AppID,
|
|
Port: Port{
|
|
Port: ep.Port,
|
|
Enabled: "true",
|
|
},
|
|
App: ep.AppID,
|
|
IPAddr: ep.IP,
|
|
VipAddress: ep.AppID,
|
|
Status: statusUp,
|
|
SecurePort: Port{
|
|
Port: ep.SecurePort,
|
|
Enabled: "false",
|
|
},
|
|
HomePageURL: ep.HomePageURL,
|
|
StatusPageURL: ep.StatusPageURL,
|
|
HealthCheckURL: ep.HealthCheckURL,
|
|
DataCenterInfo: DataCenterInfo{
|
|
Name: "MyOwn",
|
|
Class: "com.netflix.appinfo.InstanceInfo$DefaultDataCenterInfo",
|
|
},
|
|
Metadata: ep.MetaData,
|
|
},
|
|
}
|
|
|
|
body, err := json.Marshal(instance)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return e.do(ctx, "POST", []string{"apps", ep.AppID}, bytes.NewReader(body), nil)
|
|
}
|
|
|
|
func (e *Client) Heartbeat(ep Endpoint) {
|
|
e.lock.Lock()
|
|
e.keepalive[ep.AppID] = make(chan struct{})
|
|
e.lock.Unlock()
|
|
|
|
ticker := time.NewTicker(e.heartbeatInterval)
|
|
defer ticker.Stop()
|
|
retryCount := 0
|
|
for {
|
|
select {
|
|
case <-e.ctx.Done():
|
|
return
|
|
case <-e.keepalive[ep.AppID]:
|
|
return
|
|
case <-ticker.C:
|
|
if err := e.do(e.ctx, "PUT", []string{"apps", ep.AppID, ep.InstanceID}, nil, nil); err != nil {
|
|
if retryCount++; retryCount > heartbeatRetry {
|
|
_ = e.registerEndpoint(e.ctx, ep)
|
|
retryCount = 0
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (e *Client) cancelHeartbeat(appID string) {
|
|
defer e.lock.Unlock()
|
|
e.lock.Lock()
|
|
if ch, ok := e.keepalive[appID]; ok {
|
|
ch <- struct{}{}
|
|
}
|
|
}
|
|
|
|
func (e *Client) filterUp(apps ...Application) (res []Instance) {
|
|
for _, app := range apps {
|
|
for _, ins := range app.Instance {
|
|
if ins.Status == statusUp {
|
|
res = append(res, ins)
|
|
}
|
|
}
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
func (e *Client) pickServer(currentTimes int) string {
|
|
return e.urls[currentTimes%e.maxRetry]
|
|
}
|
|
|
|
func (e *Client) shuffle() {
|
|
rand.Seed(time.Now().UnixNano())
|
|
rand.Shuffle(len(e.urls), func(i, j int) {
|
|
e.urls[i], e.urls[j] = e.urls[j], e.urls[i]
|
|
})
|
|
}
|
|
|
|
func (e *Client) buildAPI(currentTimes int, params ...string) string {
|
|
if currentTimes == 0 {
|
|
e.shuffle()
|
|
}
|
|
server := e.pickServer(currentTimes)
|
|
params = append([]string{server, e.eurekaPath}, params...)
|
|
return strings.Join(params, "/")
|
|
}
|
|
|
|
func (e *Client) do(ctx context.Context, method string, params []string, input io.Reader, output interface{}) error {
|
|
for i := 0; i < e.maxRetry; i++ {
|
|
request, err := http.NewRequest(method, e.buildAPI(i, params...), input)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
request = request.WithContext(ctx)
|
|
request.Header.Add("User-Agent", "go-eureka-client")
|
|
request.Header.Add("Accept", "application/json;charset=UTF-8")
|
|
request.Header.Add("Content-Type", "application/json;charset=UTF-8")
|
|
resp, err := e.client.Do(request)
|
|
if err != nil {
|
|
continue
|
|
}
|
|
defer func() {
|
|
_, _ = io.Copy(ioutil.Discard, resp.Body)
|
|
resp.Body.Close()
|
|
}()
|
|
|
|
if output != nil && resp.StatusCode/100 == 2 {
|
|
data, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if err = json.Unmarshal(data, output); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
if resp.StatusCode >= http.StatusBadRequest {
|
|
return fmt.Errorf("response Error %d", resp.StatusCode)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
return fmt.Errorf("retry after %d times", e.maxRetry)
|
|
}
|
|
|