feat(registry): support discovery registry center (#1480)

* feat(registry): support discovery registry center
pull/1521/head
Yeqllo 3 years ago committed by GitHub
parent bae20ba735
commit 5786f61e13
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      contrib/registry/discovery/README.md
  2. 494
      contrib/registry/discovery/discovery.go
  3. 185
      contrib/registry/discovery/discovery_helper.go
  4. 11
      contrib/registry/discovery/go.mod
  5. 158
      contrib/registry/discovery/go.sum
  6. 81
      contrib/registry/discovery/impl_discover.go
  7. 127
      contrib/registry/discovery/impl_registrar.go
  8. 2
      examples/go.mod
  9. 2
      examples/go.sum
  10. 58
      examples/registry/discovery/client/main.go
  11. 70
      examples/registry/discovery/server/main.go

@ -0,0 +1,2 @@
## [discovery](https://github.com/bilibili/discovery)

@ -0,0 +1,494 @@
package discovery
import (
"context"
"fmt"
"math/rand"
"net/url"
"os"
"strconv"
"sync"
"sync/atomic"
"time"
"github.com/go-resty/resty/v2"
"github.com/pkg/errors"
"github.com/go-kratos/kratos/v2/log"
)
type Discovery struct {
config *Config
once sync.Once
ctx context.Context
cancelFunc context.CancelFunc
httpClient *resty.Client
node atomic.Value
nodeIdx uint64
mutex sync.RWMutex
apps map[string]*appInfo
registry map[string]struct{}
lastHost string
cancelPolls context.CancelFunc
logger log.Logger
}
type appInfo struct {
resolver map[*Resolve]struct{}
zoneIns atomic.Value
lastTs int64 // latest timestamp
}
// New construct a Discovery instance which implements registry.Registrar,
// registry.Discovery and registry.Watcher.
func New(c *Config, logger log.Logger) *Discovery {
if logger == nil {
logger = log.NewStdLogger(os.Stdout)
logger = log.With(logger,
"registry.pluginName", "Discovery",
"ts", log.DefaultTimestamp,
"caller", log.DefaultCaller,
)
}
if c == nil {
c = new(Config)
}
if err := fixConfig(c); err != nil {
panic(err)
}
ctx, cancel := context.WithCancel(context.Background())
d := &Discovery{
config: c,
ctx: ctx,
cancelFunc: cancel,
apps: map[string]*appInfo{},
registry: map[string]struct{}{},
logger: logger,
}
d.httpClient = resty.New().
SetTimeout(40 * time.Second)
// Discovery self found and watch
r := d.resolveBuild(_discoveryAppID)
event := r.Watch()
_, ok := <-event
if !ok {
panic("Discovery watch self failed")
}
discoveryIns, ok := r.Fetch(context.Background())
if ok {
d.newSelf(discoveryIns.Instances)
}
go d.selfProc(r, event)
return d
}
// Close stop all running process including Discovery and register
func (d *Discovery) Close() error {
d.cancelFunc()
return nil
}
func (d *Discovery) Logger() *log.Helper {
return log.NewHelper(d.logger)
}
// selfProc start a goroutine to refresh Discovery self registration information.
func (d *Discovery) selfProc(resolver *Resolve, event <-chan struct{}) {
for {
_, ok := <-event
if !ok {
return
}
zones, ok := resolver.Fetch(context.Background())
if ok {
d.newSelf(zones.Instances)
}
}
}
// newSelf
func (d *Discovery) newSelf(zones map[string][]*discoveryInstance) {
ins, ok := zones[d.config.Zone]
if !ok {
return
}
var nodes []string
for _, in := range ins {
for _, addr := range in.Addrs {
u, err := url.Parse(addr)
if err == nil && u.Scheme == "http" {
nodes = append(nodes, u.Host)
}
}
}
// diff old nodes
var olds int
for _, n := range nodes {
if node, ok := d.node.Load().([]string); ok {
for _, o := range node {
if o == n {
olds++
break
}
}
}
}
if len(nodes) == olds {
return
}
rand.Shuffle(len(nodes), func(i, j int) {
nodes[i], nodes[j] = nodes[j], nodes[i]
})
d.node.Store(nodes)
}
// resolveBuild Discovery resolver builder.
func (d *Discovery) resolveBuild(appID string) *Resolve {
r := &Resolve{
id: appID,
d: d,
event: make(chan struct{}, 1),
}
d.mutex.Lock()
app, ok := d.apps[appID]
if !ok {
app = &appInfo{
resolver: make(map[*Resolve]struct{}),
}
d.apps[appID] = app
cancel := d.cancelPolls
if cancel != nil {
cancel()
}
}
app.resolver[r] = struct{}{}
d.mutex.Unlock()
if ok {
select {
case r.event <- struct{}{}:
default:
}
}
d.Logger().Debugf("Discovery: AddWatch(%s) already watch(%v)", appID, ok)
d.once.Do(func() {
go d.serverProc()
})
return r
}
func (d *Discovery) serverProc() {
defer d.Logger().Debug("Discovery serverProc quit")
var (
retry int
ctx context.Context
cancel context.CancelFunc
)
ticker := time.NewTicker(time.Minute * 30)
defer ticker.Stop()
for {
if ctx == nil {
ctx, cancel = context.WithCancel(d.ctx)
d.mutex.Lock()
d.cancelPolls = cancel
d.mutex.Unlock()
}
select {
case <-d.ctx.Done():
return
case <-ticker.C:
d.switchNode()
default:
}
apps, err := d.polls(ctx)
if err != nil {
d.switchNode()
if ctx.Err() == context.Canceled {
ctx = nil
continue
}
time.Sleep(time.Second)
retry++
continue
}
retry = 0
d.broadcast(apps)
}
}
func (d *Discovery) pickNode() string {
nodes, ok := d.node.Load().([]string)
if !ok || len(nodes) == 0 {
return d.config.Nodes[rand.Intn(len(d.config.Nodes))]
}
return nodes[atomic.LoadUint64(&d.nodeIdx)%uint64(len(nodes))]
}
func (d *Discovery) switchNode() {
atomic.AddUint64(&d.nodeIdx, 1)
}
// renew an instance with Discovery
func (d *Discovery) renew(ctx context.Context, ins *discoveryInstance) (err error) {
// d.Logger().Debug("Discovery:renew renew calling")
d.mutex.RLock()
c := d.config
d.mutex.RUnlock()
res := new(discoveryCommonResp)
uri := fmt.Sprintf(_renewURL, d.pickNode())
// construct parameters to renew
p := newParams(d.config)
p.Set(_paramKeyAppID, ins.AppID)
// send request to Discovery server.
if _, err = d.httpClient.R().
SetContext(ctx).
SetQueryParamsFromValues(p).
SetResult(&res).
Post(uri); err != nil {
d.switchNode()
d.Logger().Errorf("Discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
uri, c.Env, ins.AppID, c.Host, err)
return
}
if res.Code != _codeOK {
err = fmt.Errorf("discovery.renew failed ErrorCode: %d", res.Code)
if res.Code == _codeNotFound {
if err = d.register(ctx, ins); err != nil {
err = errors.Wrap(err, "Discovery.renew instance, and failed to register ins")
}
return
}
d.Logger().Errorf(
"Discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
uri, c.Env, ins.AppID, c.Host, res.Code,
)
}
return
}
// cancel Remove the registered instance from Discovery
func (d *Discovery) cancel(ins *discoveryInstance) (err error) {
d.mutex.RLock()
config := d.config
d.mutex.RUnlock()
res := new(discoveryCommonResp)
uri := fmt.Sprintf(_cancelURL, d.pickNode())
p := newParams(d.config)
p.Set(_paramKeyAppID, ins.AppID)
// request
// send request to Discovery server.
if _, err = d.httpClient.R().
SetContext(context.TODO()).
SetQueryParamsFromValues(p).
SetResult(&res).
Post(uri); err != nil {
d.switchNode()
d.Logger().Errorf("Discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)",
uri, config.Env, ins.AppID, config.Host, err)
return
}
// handle response error
if res.Code != _codeOK {
if res.Code == _codeNotFound {
return nil
}
d.Logger().Warnf("Discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)",
uri, config.Env, ins.AppID, config.Host, res.Code)
err = fmt.Errorf("ErrorCode: %d", res.Code)
return
}
return
}
func (d *Discovery) broadcast(apps map[string]*disInstancesInfo) {
for appID, v := range apps {
var count int
// v maybe nil in old version(less than v1.1) Discovery,check incase of panic
if v == nil {
continue
}
for zone, ins := range v.Instances {
if len(ins) == 0 {
delete(v.Instances, zone)
}
count += len(ins)
}
if count == 0 {
continue
}
d.mutex.RLock()
app, ok := d.apps[appID]
d.mutex.RUnlock()
if ok {
app.lastTs = v.LastTs
app.zoneIns.Store(v)
d.mutex.RLock()
for rs := range app.resolver {
select {
case rs.event <- struct{}{}:
default:
}
}
d.mutex.RUnlock()
}
}
}
func (d *Discovery) polls(ctx context.Context) (apps map[string]*disInstancesInfo, err error) {
var (
lastTss = make([]int64, 0, 4)
appIDs = make([]string, 0, 16)
host = d.pickNode()
changed bool
)
if host != d.lastHost {
d.lastHost = host
changed = true
}
d.mutex.RLock()
config := d.config
for k, v := range d.apps {
if changed {
v.lastTs = 0
}
appIDs = append(appIDs, k)
lastTss = append(lastTss, v.lastTs)
}
d.mutex.RUnlock()
// if there is no app, polls just return.
if len(appIDs) == 0 {
return
}
uri := fmt.Sprintf(_pollURL, host)
res := new(discoveryPollsResp)
// params
p := newParams(nil)
p.Set(_paramKeyEnv, config.Env)
p.Set(_paramKeyHostname, config.Host)
for _, appID := range appIDs {
p.Add(_paramKeyAppID, appID)
}
for _, ts := range lastTss {
p.Add("latest_timestamp", strconv.FormatInt(ts, 10))
}
// request
reqURI := uri + "?" + p.Encode()
if _, err = d.httpClient.R().
SetContext(ctx).
SetQueryParamsFromValues(p).
SetResult(res).Get(uri); err != nil {
d.switchNode()
d.Logger().Errorf("Discovery: client.Get(%s) error(%+v)", reqURI, err)
return nil, err
}
if res.Code != _codeOK {
if res.Code != _codeNotModified {
d.Logger().Errorf("Discovery: client.Get(%s) get error code(%d)", reqURI, res.Code)
}
err = fmt.Errorf("discovery.polls failed ErrCode: %d", res.Code)
return
}
for _, app := range res.Data {
if app.LastTs == 0 {
err = ErrServerError
d.Logger().Errorf("Discovery: client.Get(%s) latest_timestamp is 0, instances:(%+v)", reqURI, res.Data)
return
}
}
d.Logger().Debugf("Discovery: successfully polls(%s) instances (%+v)", reqURI, res.Data)
apps = res.Data
return
}
// Resolve Discovery resolver.
type Resolve struct {
id string
event chan struct{}
d *Discovery
}
// Watch instance.
func (r *Resolve) Watch() <-chan struct{} {
return r.event
}
// Fetch resolver instance.
func (r *Resolve) Fetch(ctx context.Context) (ins *disInstancesInfo, ok bool) {
r.d.mutex.RLock()
app, ok := r.d.apps[r.id]
r.d.mutex.RUnlock()
if ok {
var appIns *disInstancesInfo
appIns, ok = app.zoneIns.Load().(*disInstancesInfo)
if !ok {
return
}
ins = new(disInstancesInfo)
ins.LastTs = appIns.LastTs
ins.Scheduler = appIns.Scheduler
ins.Instances = make(map[string][]*discoveryInstance)
for zone, in := range appIns.Instances {
ins.Instances[zone] = in
}
//if r.opt.Filter != nil {
// ins.Instances = r.opt.Filter(appIns.Instances)
//} else {
// ins.Instances = make(map[string][]*discoveryInstance)
// for zone, in := range appIns.Instances {
// ins.Instances[zone] = in
// }
//}
//if r.opt.scheduler != nil {
// ins.Instances[r.opt.ClientZone] = r.opt.scheduler(ins)
//}
//if r.opt.Subset != nil && r.opt.SubsetSize != 0 {
// for zone, inss := range ins.Instances {
// ins.Instances[zone] = r.opt.Subset(inss, r.opt.SubsetSize)
// }
//}
}
return
}
// Close resolver
func (r *Resolve) Close() error {
r.d.mutex.Lock()
if app, ok := r.d.apps[r.id]; ok && len(app.resolver) != 0 {
delete(app.resolver, r)
// TODO: delete app from builder
}
r.d.mutex.Unlock()
return nil
}

@ -0,0 +1,185 @@
package discovery
import (
"fmt"
"net/url"
"os"
"strconv"
"time"
"github.com/pkg/errors"
"github.com/go-kratos/kratos/v2/registry"
)
var (
ErrDuplication = errors.New("register failed: instance duplicated: ")
ErrServerError = errors.New("server error")
)
const (
// Discovery server resource uri
_registerURL = "http://%s/discovery/register"
//_setURL = "http://%s/discovery/set"
_cancelURL = "http://%s/discovery/cancel"
_renewURL = "http://%s/discovery/renew"
_pollURL = "http://%s/discovery/polls"
// Discovery server error codes
_codeOK = 0
_codeNotFound = -404
_codeNotModified = -304
//_SERVER_ERROR = -500
// _registerGap is the gap to renew instance registration.
_registerGap = 30 * time.Second
_statusUP = "1"
_discoveryAppID = "infra.discovery"
)
// Config Discovery configures.
type Config struct {
Nodes []string
Region string
Zone string
Env string
Host string
}
func fixConfig(c *Config) error {
if c.Host == "" {
c.Host, _ = os.Hostname()
}
if len(c.Nodes) == 0 || c.Region == "" || c.Zone == "" || c.Env == "" || c.Host == "" {
return fmt.Errorf(
"invalid Discovery config nodes:%+v region:%s zone:%s deployEnv:%s host:%s",
c.Nodes,
c.Region,
c.Zone,
c.Env,
c.Host,
)
}
return nil
}
// discoveryInstance represents a server the client connects to.
type discoveryInstance struct {
Region string `json:"region"` // Region is region.
Zone string `json:"zone"` // Zone is IDC.
Env string `json:"env"` // Env prod/pre/uat/fat1
AppID string `json:"appid"` // AppID is mapping service-tree appId.
Hostname string `json:"hostname"` // Hostname is hostname from docker
Addrs []string `json:"addrs"` // Addrs is the address of app instance format: scheme://host
Version string `json:"version"` // Version is publishing version.
LastTs int64 `json:"latest_timestamp"` // LastTs is instance latest updated timestamp
// Metadata is the information associated with Addr, which may be used to make load balancing decision.
Metadata map[string]string `json:"metadata"`
Status int64 `json:"status"` // Status instance status, eg: 1UP 2Waiting
}
const _reservedInstanceIDKey = "kratos.v2.serviceinstance.id"
// fromServerInstance convert registry.ServiceInstance into discoveryInstance
func fromServerInstance(ins *registry.ServiceInstance, config *Config) *discoveryInstance {
if ins == nil {
return nil
}
metadata := ins.Metadata
if ins.Metadata == nil {
metadata = make(map[string]string, 8)
}
metadata[_reservedInstanceIDKey] = ins.ID
return &discoveryInstance{
Region: config.Region,
Zone: config.Zone,
Env: config.Env,
AppID: ins.Name,
Hostname: config.Host,
Addrs: ins.Endpoints,
Version: ins.Version,
LastTs: time.Now().Unix(),
Metadata: metadata,
Status: 1,
}
}
// toServiceInstance convert discoveryInstance into registry.ServiceInstance
func toServiceInstance(ins *discoveryInstance) *registry.ServiceInstance {
if ins == nil {
return nil
}
return &registry.ServiceInstance{
ID: ins.Metadata[_reservedInstanceIDKey],
Name: ins.AppID,
Version: ins.Version,
Metadata: map[string]string{
"region": ins.Region,
"zone": ins.Region,
"lastTs": strconv.Itoa(int(ins.LastTs)),
"env": ins.Env,
"hostname": ins.Hostname,
},
Endpoints: ins.Addrs,
}
}
// disInstancesInfo instance info.
type disInstancesInfo struct {
Instances map[string][]*discoveryInstance `json:"instances"`
LastTs int64 `json:"latest_timestamp"`
Scheduler *scheduler `json:"scheduler"`
}
// scheduler scheduler.
type scheduler struct {
Clients map[string]*zoneStrategy `json:"clients"`
}
// zoneStrategy is the scheduling strategy of all zones
type zoneStrategy struct {
Zones map[string]*strategy `json:"zones"`
}
// strategy is zone scheduling strategy.
type strategy struct {
Weight int64 `json:"weight"`
}
const (
_paramKeyRegion = "region"
_paramKeyZone = "zone"
_paramKeyEnv = "env"
_paramKeyHostname = "hostname"
_paramKeyAppID = "appid"
_paramKeyAddrs = "addrs"
_paramKeyVersion = "version"
_paramKeyStatus = "status"
_paramKeyMetadata = "metadata"
)
func newParams(c *Config) url.Values {
p := make(url.Values, 8)
if c == nil {
return p
}
p.Set(_paramKeyRegion, c.Region)
p.Set(_paramKeyZone, c.Zone)
p.Set(_paramKeyEnv, c.Env)
p.Set(_paramKeyHostname, c.Host)
return p
}
type discoveryCommonResp struct {
Code int `json:"code"`
Message string `json:"message"`
}
type discoveryPollsResp struct {
Code int `json:"code"`
Data map[string]*disInstancesInfo `json:"data"`
}

@ -0,0 +1,11 @@
module github.com/go-kratos/kratos/contrib/registry/discovery/v2
go 1.16
require (
github.com/go-kratos/kratos/v2 v2.0.5
github.com/go-resty/resty/v2 v2.6.0
github.com/pkg/errors v0.9.1
)
replace github.com/go-kratos/kratos/v2 v2.0.5 => ../../../

@ -0,0 +1,158 @@
cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/StackExchange/wmi v1.2.1/go.mod h1:rcmrprowKIVzvc+NUiLncP2uuArMWLCbu9SBzvHz7e8=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
github.com/cncf/udpa/go v0.0.0-20201120205902-5459f2c99403/go.mod h1:WmhPx2Nbnhtbo57+VJT5O0JRkEi1Wbu0z5j0R8u5Hbk=
github.com/cncf/xds/go v0.0.0-20210312221358-fbca930ec8ed/go.mod h1:eXthEFrGJvWHgFFCl3hGmgk+/aYT6PnTQLykKQRLhEs=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
github.com/envoyproxy/go-control-plane v0.9.9-0.20201210154907-fd9021fe5dad/go.mod h1:cXg6YxExXjJnVBQHBLXeUAgxn2UodCpnH306RInaBQk=
github.com/envoyproxy/go-control-plane v0.9.9-0.20210512163311-63b5d3c536b0/go.mod h1:hliV/p42l8fGbc6Y9bQ70uLwIvmJyVE5k4iMKlh8wCQ=
github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-kratos/aegis v0.1.1/go.mod h1:jYeSQ3Gesba478zEnujOiG5QdsyF3Xk/8owFUeKcHxw=
github.com/go-ole/go-ole v1.2.5/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0=
github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4=
github.com/go-playground/form/v4 v4.2.0/go.mod h1:q1a2BY+AQUUzhl6xA/6hBetay6dEIhMHjgvJiGo6K7U=
github.com/go-resty/resty/v2 v2.6.0 h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKYS4=
github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q=
github.com/golang-jwt/jwt/v4 v4.0.0/go.mod h1:/xlHOz8bRuivTWchD4jCa+NbatV+wEUSzwAxVc6locg=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.3/go.mod h1:vzj43D7+SQXF/4pzW/hwtAqwc6iTitCiVSaWz5lYuqw=
github.com/golang/protobuf v1.4.0-rc.1/go.mod h1:ceaxUfeHdC40wWswd/P6IGgMaK3YpKi5j83Wpe3EHw8=
github.com/golang/protobuf v1.4.0-rc.1.0.20200221234624-67d41d38c208/go.mod h1:xKAWHe0F5eneWXFV3EuXVDTCmh+JuBKY0li0aMyXATA=
github.com/golang/protobuf v1.4.0-rc.2/go.mod h1:LlEzMj4AhA7rCAGe4KMBDvJI+AwstrUpVNzEA03Pprs=
github.com/golang/protobuf v1.4.0-rc.4.0.20200313231945-b860323f09d0/go.mod h1:WU3c8KckQ9AFe+yFwt9sWVRKCVIyN9cPHBJSNnbL67w=
github.com/golang/protobuf v1.4.0/go.mod h1:jodUvKwWbYaEsadDk5Fwe5c77LiNKVO9IDvqG2KuDX0=
github.com/golang/protobuf v1.4.1/go.mod h1:U8fpvMrcmy5pZrNK1lt4xCsGvpyWQ/VVv6QDs8UjoX8=
github.com/golang/protobuf v1.4.2/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.4.3/go.mod h1:oDoupMAO8OvCJWAcko0GGGIgR6R6ocIYbsSw735rRwI=
github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaSAoJOfIk=
github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/google/go-cmp v0.3.0/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.3.1/go.mod h1:8QqcDgzrUqlUb/G2PQTWiueGozuR1884gddMywk6iLU=
github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
github.com/imdario/mergo v0.3.12/go.mod h1:jmQim1M+e3UYxmgPu/WyfjB3N3VflVyUjjjwH0dnCYA=
github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ=
github.com/shirou/gopsutil/v3 v3.21.8/go.mod h1:YWp/H8Qs5fVmf17v7JNZzA0mPJ+mS2e9JdiUF9LlKzQ=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
github.com/tklauser/numcpus v0.3.0/go.mod h1:yFGUr7TUHQRAhyqBcEg0Ge34zDBAsIvJJcyE6boqnA8=
github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k=
go.opentelemetry.io/otel v1.0.0-RC3/go.mod h1:Ka5j3ua8tZs4Rkq4Ex3hwgBgOchyPVq5S6P2lz//nKQ=
go.opentelemetry.io/otel/sdk v1.0.0-RC3/go.mod h1:78H6hyg2fka0NYT9fqGuFLvly2yCxiBXDJAgLKo/2Us=
go.opentelemetry.io/otel/trace v1.0.0-RC3/go.mod h1:VUt2TUYd8S2/ZRX09ZDFZQwn2RqfMB5MzO17jBojGxo=
go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE=
golang.org/x/lint v0.0.0-20190227174305-5b3e6a55c961/go.mod h1:wehouNa3lNwaWXcvxsM5YxQ5yQlVC4a0KAMCusXpPoU=
golang.org/x/lint v0.0.0-20190313153728-d0100b6bd8b3/go.mod h1:6SW0HCj/g11FgYtHlgUYUwCkIfeOF89ocIRzGO/8vkc=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190108225652-1e06a53dbb7e/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4 h1:4nGaVu0QrbjT/AK2PRLuQfQuh6DJve+pELhqTdAj3x0=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20200107190931-bf48bf16ab8d/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210816074244-15123e1e1f71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190226205152-f727befe758c/go.mod h1:9Yl7xja0Znq3iFh3HoIrodX9oNMXvdceNzlUR8zjMvY=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20190524140312-2c0ae7006135/go.mod h1:RgjU9mgBXZiqYHBnxXauZ1Gv1EHHAz9KjViQ78xBX0Q=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200130002326-2f3ba24bd6e7/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.4.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc=
google.golang.org/genproto v0.0.0-20190819201941-24fa4b261c55/go.mod h1:DMBHOl98Agz4BDEuKkezgsaosCRResVns1a3J2ZsMNc=
google.golang.org/genproto v0.0.0-20200513103714-09dca8ec2884/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48=
google.golang.org/grpc v1.19.0/go.mod h1:mqu4LbDTu4XGKhr4mRzUsmM4RtVoemTSY81AxZiDr8c=
google.golang.org/grpc v1.23.0/go.mod h1:Y5yQAOtifL1yxbo5wqy6BxZv8vAUGQwXBOALyacEbxg=
google.golang.org/grpc v1.25.1/go.mod h1:c3i+UQWmh7LiEpx4sFZnkU36qjEYZ0imhYfXVyQciAY=
google.golang.org/grpc v1.27.0/go.mod h1:qbnxyOmOxrQa7FizSgH+ReBfzJrCY1pSN7KXBS8abTk=
google.golang.org/grpc v1.33.1/go.mod h1:fr5YgcSWrqhRRxogOsw7RzIpsmvOZ6IcH4kBYTpR3n0=
google.golang.org/grpc v1.36.0/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAGRRjU=
google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/protobuf v0.0.0-20200109180630-ec00e32a8dfd/go.mod h1:DFci5gLYBciE7Vtevhsrf46CRTquxDuWsQurQQe4oz8=
google.golang.org/protobuf v0.0.0-20200221191635-4d8936d0db64/go.mod h1:kwYJMbMJ01Woi6D6+Kah6886xMZcty6N08ah7+eCXa0=
google.golang.org/protobuf v0.0.0-20200228230310-ab0ca4ff8a60/go.mod h1:cfTl7dwQJ+fmap5saPgwCLgHXTUD7jkjRqWcaiX5VyM=
google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miEFZTKqfCUM6K7xSMQL9OKL/b6hQv+e19PK+JZNE=
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.22.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.23.1-0.20200526195155-81db48ad09cc/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
google.golang.org/protobuf v1.25.0/go.mod h1:9JNX74DMeImyA3h4bdi1ymwjUzf21/xIlbajtzgsN7c=
google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp09yW+WbY/TyQbw=
google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
google.golang.org/protobuf v1.27.1/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.3/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190523083050-ea95bdfd59fc/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=

@ -0,0 +1,81 @@
package discovery
import (
"context"
"fmt"
"time"
"github.com/pkg/errors"
"github.com/go-kratos/kratos/v2/registry"
)
func filterInstancesByZone(ins *disInstancesInfo, zone string) []*registry.ServiceInstance {
zoneInstance, ok := ins.Instances[zone]
if !ok || len(zoneInstance) == 0 {
return nil
}
out := make([]*registry.ServiceInstance, 0, len(zoneInstance))
for _, v := range zoneInstance {
if v == nil {
continue
}
out = append(out, toServiceInstance(v))
}
return out
}
func (d *Discovery) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
r := d.resolveBuild(serviceName)
ins, ok := r.Fetch(ctx)
if !ok {
return nil, errors.New("Discovery.GetService fetch failed")
}
out := filterInstancesByZone(ins, d.config.Zone)
if len(out) == 0 {
return nil, fmt.Errorf("Discovery.GetService(%s) not found", serviceName)
}
return out, nil
}
func (d *Discovery) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
return &watcher{
Resolve: d.resolveBuild(serviceName),
serviceName: serviceName,
}, nil
}
type watcher struct {
*Resolve
serviceName string
}
func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
event := w.Resolve.Watch()
// change event come
<-event
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
defer cancel()
ins, ok := w.Resolve.Fetch(ctx)
if !ok {
return nil, errors.New("Discovery.GetService fetch failed")
}
out := filterInstancesByZone(ins, w.Resolve.d.config.Zone)
if len(out) == 0 {
return nil, fmt.Errorf("Discovery.GetService(%s) not found", w.serviceName)
}
return out, nil
}
func (w *watcher) Stop() error {
return w.Resolve.Close()
}

@ -0,0 +1,127 @@
package discovery
import (
"context"
"encoding/json"
"fmt"
"strconv"
"time"
"github.com/pkg/errors"
"github.com/go-kratos/kratos/v2/registry"
)
func (d *Discovery) Register(ctx context.Context, service *registry.ServiceInstance) (err error) {
ins := fromServerInstance(service, d.config)
d.mutex.Lock()
if _, ok := d.registry[ins.AppID]; ok {
err = errors.Wrap(ErrDuplication, ins.AppID)
} else {
d.registry[ins.AppID] = struct{}{}
}
d.mutex.Unlock()
if err != nil {
return
}
ctx, cancel := context.WithCancel(d.ctx)
if err = d.register(ctx, ins); err != nil {
d.mutex.Lock()
delete(d.registry, ins.AppID)
d.mutex.Unlock()
cancel()
return
}
ch := make(chan struct{}, 1)
d.cancelFunc = func() {
cancel()
<-ch
}
// renew the current register_service
go func() {
defer d.Logger().Warn("Discovery:register_service goroutine quit")
ticker := time.NewTicker(_registerGap)
defer ticker.Stop()
for {
select {
case <-ticker.C:
_ = d.renew(ctx, ins)
case <-ctx.Done():
_ = d.cancel(ins)
ch <- struct{}{}
return
}
}
}()
return
}
// register an instance with Discovery
func (d *Discovery) register(ctx context.Context, ins *discoveryInstance) (err error) {
d.mutex.RLock()
c := d.config
d.mutex.RUnlock()
var metadata []byte
if ins.Metadata != nil {
if metadata, err = json.Marshal(ins.Metadata); err != nil {
d.Logger().Errorf(
"Discovery:register instance Marshal metadata(%v) failed!error(%v)", ins.Metadata, err,
)
}
}
res := new(struct {
Code int `json:"code"`
Message string `json:"message"`
})
uri := fmt.Sprintf(_registerURL, d.pickNode())
// params
p := newParams(d.config)
p.Set(_paramKeyAppID, ins.AppID)
for _, addr := range ins.Addrs {
p.Add(_paramKeyAddrs, addr)
}
p.Set(_paramKeyVersion, ins.Version)
if ins.Status == 0 {
p.Set(_paramKeyStatus, _statusUP)
} else {
p.Set(_paramKeyStatus, strconv.FormatInt(ins.Status, 10))
}
p.Set(_paramKeyMetadata, string(metadata))
// send request to Discovery server.
if _, err = d.httpClient.R().
SetContext(ctx).
SetQueryParamsFromValues(p).
SetResult(&res).
Post(uri); err != nil {
d.switchNode()
d.Logger().Errorf("Discovery: register client.Get(%s) zone(%s) env(%s) appid(%s) addrs(%v) error(%v)",
uri+"?"+p.Encode(), c.Zone, c.Env, ins.AppID, ins.Addrs, err)
return
}
if res.Code != 0 {
err = fmt.Errorf("ErrorCode: %d", res.Code)
d.Logger().Errorf("Discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)",
uri, c.Env, ins.AppID, ins.Addrs, res.Code)
}
d.Logger().Infof(
"Discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success\n",
uri, c.Env, ins.AppID, ins.Addrs,
)
return
}
func (d *Discovery) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
ins := fromServerInstance(service, d.config)
return d.cancel(ins)
}

@ -11,6 +11,7 @@ require (
github.com/go-kratos/kratos/contrib/config/apollo/v2 v2.0.0-20210901080230-515b71ec9061
github.com/go-kratos/kratos/contrib/metrics/prometheus/v2 v2.0.0-00010101000000-000000000000
github.com/go-kratos/kratos/contrib/registry/consul/v2 v2.0.0-00010101000000-000000000000
github.com/go-kratos/kratos/contrib/registry/discovery/v2 v2.0.0-00010101000000-000000000000
github.com/go-kratos/kratos/contrib/registry/etcd/v2 v2.0.0-00010101000000-000000000000
github.com/go-kratos/kratos/contrib/registry/nacos/v2 v2.0.0-00010101000000-000000000000
github.com/go-kratos/kratos/contrib/registry/zookeeper/v2 v2.0.0-00010101000000-000000000000
@ -49,6 +50,7 @@ replace (
github.com/go-kratos/kratos/contrib/config/apollo/v2 => ../contrib/config/apollo
github.com/go-kratos/kratos/contrib/metrics/prometheus/v2 => ../contrib/metrics/prometheus
github.com/go-kratos/kratos/contrib/registry/consul/v2 => ../contrib/registry/consul
github.com/go-kratos/kratos/contrib/registry/discovery/v2 => ../contrib/registry/discovery
github.com/go-kratos/kratos/contrib/registry/etcd/v2 => ../contrib/registry/etcd
github.com/go-kratos/kratos/contrib/registry/nacos/v2 => ../contrib/registry/nacos
github.com/go-kratos/kratos/contrib/registry/zookeeper/v2 => ../contrib/registry/zookeeper

@ -194,6 +194,8 @@ github.com/go-redis/redis/v8 v8.3.2/go.mod h1:jszGxBCez8QA1HWSmQxJO9Y82kNibbUmeY
github.com/go-redis/redis/v8 v8.5.0/go.mod h1:YmEcgBDttjnkbMzDAhDtQxY9yVA7jMN6PCR5HeMvqFE=
github.com/go-redis/redis/v8 v8.11.2 h1:WqlSpAwz8mxDSMCvbyz1Mkiqe0LE5OY4j3lgkvu1Ts0=
github.com/go-redis/redis/v8 v8.11.2/go.mod h1:DLomh7y2e3ggQXQLd1YgmvIfecPJoFl7WU5SOQ/r06M=
github.com/go-resty/resty/v2 v2.6.0 h1:joIR5PNLM2EFqqESUjCMGXrWmXNHEU9CEiK813oKYS4=
github.com/go-resty/resty/v2 v2.6.0/go.mod h1:PwvJS6hvaPkjtjNg9ph+VrSD92bi5Zq73w/BIH7cC3Q=
github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w=
github.com/go-sql-driver/mysql v1.5.1-0.20200311113236-681ffa848bae/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
github.com/go-sql-driver/mysql v1.6.0 h1:BCTh4TKNUYmOmMUcQ3IipzF5prigylS7XXjEkfCHuOE=

@ -0,0 +1,58 @@
package main
import (
"context"
"log"
"time"
srcgrpc "google.golang.org/grpc"
"github.com/go-kratos/kratos/contrib/registry/discovery/v2"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2/transport/grpc"
)
func main() {
r := discovery.New(&discovery.Config{
Nodes: []string{"0.0.0.0:7171"},
Env: "dev",
Region: "sh1",
Zone: "zone1",
Host: "localhost",
}, nil)
connGRPC, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
grpc.WithDiscovery(r),
)
if err != nil {
log.Fatal(err)
}
defer connGRPC.Close()
//connHTTP, err := http.NewClient(
// context.Background(),
// http.WithEndpoint("discovery:///helloworld"),
// http.WithDiscovery(r),
// http.WithBlock(),
//)
//if err != nil {
// log.Fatal(err)
//}
//defer connHTTP.Close()
for {
callGRPC(connGRPC)
time.Sleep(time.Second)
}
}
func callGRPC(conn *srcgrpc.ClientConn) {
client := helloworld.NewGreeterClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}
log.Printf("[grpc] SayHello %+v\n", reply)
}

@ -0,0 +1,70 @@
package main
import (
"context"
"fmt"
"os"
"github.com/go-kratos/kratos/contrib/registry/discovery/v2"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware/logging"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
)
// server is used to implement helloworld.GreeterServer.
type server struct {
helloworld.UnimplementedGreeterServer
}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
return &helloworld.HelloReply{Message: fmt.Sprintf("Welcome %+v!", in.Name)}, nil
}
func main() {
logger := log.NewStdLogger(os.Stdout)
logger = log.With(logger, "service", "example.registry.discovery")
r := discovery.New(&discovery.Config{
Nodes: []string{"0.0.0.0:7171"},
Env: "dev",
Region: "sh1",
Zone: "zone1",
Host: "localhost",
}, logger)
httpSrv := http.NewServer(
http.Address(":8000"),
http.Middleware(
recovery.Recovery(),
),
)
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
grpc.Middleware(
recovery.Recovery(),
logging.Server(logger),
),
)
s := &server{}
helloworld.RegisterGreeterServer(grpcSrv, s)
helloworld.RegisterGreeterHTTPServer(httpSrv, s)
app := kratos.New(
kratos.Name("helloworld"),
kratos.Server(
httpSrv,
grpcSrv,
),
kratos.Metadata(map[string]string{"color": "gray"}),
kratos.Registrar(r),
)
if err := app.Run(); err != nil {
log.NewHelper(logger).Fatal(err)
}
}
Loading…
Cancel
Save