@ -7,7 +7,6 @@ import (
"fmt"
"fmt"
"math/rand"
"math/rand"
"net/url"
"net/url"
"os"
"strconv"
"strconv"
"strings"
"strings"
"sync"
"sync"
@ -18,10 +17,7 @@ import (
"github.com/bilibili/kratos/pkg/ecode"
"github.com/bilibili/kratos/pkg/ecode"
"github.com/bilibili/kratos/pkg/log"
"github.com/bilibili/kratos/pkg/log"
"github.com/bilibili/kratos/pkg/naming"
"github.com/bilibili/kratos/pkg/naming"
bm "github.com/bilibili/kratos/pkg/net/http/blademaster"
http "github.com/bilibili/kratos/pkg/net/http/blademaster"
"github.com/bilibili/kratos/pkg/net/netutil"
"github.com/bilibili/kratos/pkg/net/netutil/breaker"
xstr "github.com/bilibili/kratos/pkg/str"
xtime "github.com/bilibili/kratos/pkg/time"
xtime "github.com/bilibili/kratos/pkg/time"
)
)
@ -30,16 +26,12 @@ const (
_setURL = "http://%s/discovery/set"
_setURL = "http://%s/discovery/set"
_cancelURL = "http://%s/discovery/cancel"
_cancelURL = "http://%s/discovery/cancel"
_renewURL = "http://%s/discovery/renew"
_renewURL = "http://%s/discovery/renew"
_pollURL = "http://%s/discovery/polls"
_pollURL = "http://%s/discovery/polls"
_nodesURL = "http://%s/discovery/nodes"
_registerGap = 30 * time . Second
_registerGap = 30 * time . Second
_statusUP = "1"
_statusUP = "1"
)
const (
_appid = "infra.discovery"
_appid = "infra.discovery"
)
)
@ -51,116 +43,105 @@ var (
ErrDuplication = errors . New ( "discovery: instance duplicate registration" )
ErrDuplication = errors . New ( "discovery: instance duplicate registration" )
)
)
var (
_once sync . Once
_builder naming . Builder
)
// Builder return default discvoery resolver builder.
func Builder ( ) naming . Builder {
_once . Do ( func ( ) {
_builder = New ( nil )
} )
return _builder
}
// Build register resolver into default discovery.
func Build ( id string ) naming . Resolver {
return Builder ( ) . Build ( id )
}
// Config discovery configures.
// Config discovery configures.
type Config struct {
type Config struct {
Nodes [ ] string
Nodes [ ] string
Region string
Zone string
Zone string
Env string
Env string
Host string
Host string
}
}
type appData struct {
type appData struct {
ZoneInstances map [ string ] [ ] * naming . Instance ` json:"zone_instances" `
Instances map [ string ] [ ] * naming . Instance ` json:"instances" `
LastTs int64 ` json:"latest_timestamp" `
LastTs int64 ` json:"latest_timestamp" `
Err string ` json:"err" `
}
}
// Discovery is discovery client.
// Discovery is discovery client.
type Discovery struct {
type Discovery struct {
c * Config
once sync . Once
once sync . Once
conf * Config
ctx context . Context
ctx context . Context
cancelFunc context . CancelFunc
cancelFunc context . CancelFunc
httpClient * bm . Client
httpClient * http . Client
node atomic . Value
nodeIdx uint64
mutex sync . RWMutex
mutex sync . RWMutex
apps map [ string ] * appInfo
apps map [ string ] * appInfo
registry map [ string ] struct { }
registry map [ string ] struct { }
lastHost string
lastHost string
cancelPolls context . CancelFunc
cancelPolls context . CancelFunc
idx uint64
node atomic . Value
delete chan * appInfo
delete chan * appInfo
}
}
type appInfo struct {
type appInfo struct {
resolver map [ * Resolve ] struct { }
zoneIns atomic . Value
zoneIns atomic . Value
resolver map [ * Resolver ] struct { }
lastTs int64 // latest timestamp
lastTs int64 // latest timestamp
}
}
func fixConfig ( c * Config ) {
func fixConfig ( c * Config ) {
if len ( c . Nodes ) == 0 {
if len ( c . Nodes ) == 0 {
c . Nodes = [ ] string { "NOTE: please config a default HOST" }
c . Nodes = strings . Split ( env . DiscoveryNodes , "," )
}
}
if env . Zone != "" {
if c . Region == "" {
c . Region = env . Region
}
if c . Zone == "" {
c . Zone = env . Zone
c . Zone = env . Zone
}
}
if env . DeployEnv != "" {
if c . Env = = "" {
c . Env = env . DeployEnv
c . Env = env . DeployEnv
}
}
if env . Hostname ! = "" {
if c . Host = = "" {
c . Host = env . Hostname
c . Host = env . Hostname
} else {
c . Host , _ = os . Hostname ( )
}
}
}
}
var (
once sync . Once
_defaultDiscovery * Discovery
)
func initDefault ( ) {
once . Do ( func ( ) {
_defaultDiscovery = New ( nil )
} )
}
// Builder return default discvoery resolver builder.
func Builder ( ) naming . Builder {
if _defaultDiscovery == nil {
initDefault ( )
}
return _defaultDiscovery
}
// Build register resolver into default discovery.
func Build ( id string ) naming . Resolver {
if _defaultDiscovery == nil {
initDefault ( )
}
return _defaultDiscovery . Build ( id )
}
// New new a discovery client.
// New new a discovery client.
func New ( c * Config ) ( d * Discovery ) {
func New ( c * Config ) ( d * Discovery ) {
if c == nil {
if c == nil {
c = & Config { }
c = new ( Config )
}
}
fixConfig ( c )
fixConfig ( c )
ctx , cancel := context . WithCancel ( context . Background ( ) )
ctx , cancel := context . WithCancel ( context . Background ( ) )
d = & Discovery {
d = & Discovery {
c : c ,
ctx : ctx ,
ctx : ctx ,
cancelFunc : cancel ,
cancelFunc : cancel ,
conf : c ,
apps : map [ string ] * appInfo { } ,
apps : map [ string ] * appInfo { } ,
registry : map [ string ] struct { } { } ,
registry : map [ string ] struct { } { } ,
delete : make ( chan * appInfo , 10 ) ,
delete : make ( chan * appInfo , 10 ) ,
}
}
// httpClient
// httpClient
cfg := & bm . ClientConfig {
cfg := & http . ClientConfig {
Dial : xtime . Duration ( 3 * time . Second ) ,
Dial : xtime . Duration ( 3 * time . Second ) ,
Timeout : xtime . Duration ( 40 * time . Second ) ,
Timeout : xtime . Duration ( 40 * time . Second ) ,
Breaker : & breaker . Config {
KeepAlive : xtime . Duration ( 40 * time . Second ) ,
Window : 100 ,
}
Sleep : 3 ,
d . httpClient = http . NewClient ( cfg )
Bucket : 10 ,
// discovery self
Ratio : 0.5 ,
Request : 100 ,
} ,
}
d . httpClient = bm . NewClient ( cfg )
resolver := d . Build ( _appid )
resolver := d . Build ( _appid )
event := resolver . Watch ( )
event := resolver . Watch ( )
_ , ok := <- event
_ , ok := <- event
@ -169,7 +150,7 @@ func New(c *Config) (d *Discovery) {
}
}
ins , ok := resolver . Fetch ( context . Background ( ) )
ins , ok := resolver . Fetch ( context . Background ( ) )
if ok {
if ok {
d . newSelf ( ins )
d . newSelf ( ins . Instances )
}
}
go d . selfproc ( resolver , event )
go d . selfproc ( resolver , event )
return
return
@ -183,13 +164,13 @@ func (d *Discovery) selfproc(resolver naming.Resolver, event <-chan struct{}) {
}
}
zones , ok := resolver . Fetch ( context . Background ( ) )
zones , ok := resolver . Fetch ( context . Background ( ) )
if ok {
if ok {
d . newSelf ( zones )
d . newSelf ( zones . Instances )
}
}
}
}
}
}
func ( d * Discovery ) newSelf ( zones map [ string ] [ ] * naming . Instance ) {
func ( d * Discovery ) newSelf ( zones map [ string ] [ ] * naming . Instance ) {
ins , ok := zones [ d . conf . Zone ]
ins , ok := zones [ d . c . Zone ]
if ! ok {
if ! ok {
return
return
}
}
@ -203,22 +184,22 @@ func (d *Discovery) newSelf(zones map[string][]*naming.Instance) {
}
}
}
}
// diff old nodes
// diff old nodes
olds , ok := d . node . Load ( ) . ( [ ] string )
var olds int
if ok {
var diff int
for _ , n := range nodes {
for _ , n := range nodes {
for _ , o := range olds {
if node , ok := d . node . Load ( ) . ( [ ] string ) ; ok {
for _ , o := range node {
if o == n {
if o == n {
diff ++
olds ++
break
break
}
}
}
}
}
}
if len ( nodes ) == diff {
return
}
}
if len ( nodes ) == olds {
return
}
}
rand . Shuffle ( len ( nodes ) , func ( i , j int ) {
// FIXME: we should use rand.Shuffle() in golang 1.10
shuffle ( len ( nodes ) , func ( i , j int ) {
nodes [ i ] , nodes [ j ] = nodes [ j ] , nodes [ i ]
nodes [ i ] , nodes [ j ] = nodes [ j ] , nodes [ i ]
} )
} )
d . node . Store ( nodes )
d . node . Store ( nodes )
@ -226,7 +207,7 @@ func (d *Discovery) newSelf(zones map[string][]*naming.Instance) {
// Build disovery resovler builder.
// Build disovery resovler builder.
func ( d * Discovery ) Build ( appid string ) naming . Resolver {
func ( d * Discovery ) Build ( appid string ) naming . Resolver {
r := & Resolver {
r := & Resolve {
id : appid ,
id : appid ,
d : d ,
d : d ,
event : make ( chan struct { } , 1 ) ,
event : make ( chan struct { } , 1 ) ,
@ -235,7 +216,7 @@ func (d *Discovery) Build(appid string) naming.Resolver {
app , ok := d . apps [ appid ]
app , ok := d . apps [ appid ]
if ! ok {
if ! ok {
app = & appInfo {
app = & appInfo {
resolver : make ( map [ * Resolver ] struct { } ) ,
resolver : make ( map [ * Resolve ] struct { } ) ,
}
}
d . apps [ appid ] = app
d . apps [ appid ] = app
cancel := d . cancelPolls
cancel := d . cancelPolls
@ -263,32 +244,32 @@ func (d *Discovery) Scheme() string {
return "discovery"
return "discovery"
}
}
// Resolver discveory resolver.
// Resolve discveory resolver.
type Resolver struct {
type Resolve struct {
id string
id string
event chan struct { }
event chan struct { }
d * Discovery
d * Discovery
}
}
// Watch watch instance.
// Watch watch instance.
func ( r * Resolver ) Watch ( ) <- chan struct { } {
func ( r * Resolve ) Watch ( ) <- chan struct { } {
return r . event
return r . event
}
}
// Fetch fetch resolver instance.
// Fetch fetch resolver instance.
func ( r * Resolver ) Fetch ( c context . Context ) ( ins map [ string ] [ ] * naming . Instance , ok bool ) {
func ( r * Resolve ) Fetch ( ctx context . Context ) ( ins * naming . InstancesInfo , ok bool ) {
r . d . mutex . RLock ( )
r . d . mutex . RLock ( )
app , ok := r . d . apps [ r . id ]
app , ok := r . d . apps [ r . id ]
r . d . mutex . RUnlock ( )
r . d . mutex . RUnlock ( )
if ok {
if ok {
ins , ok = app . zoneIns . Load ( ) . ( map [ string ] [ ] * naming . Instance )
ins , ok = app . zoneIns . Load ( ) . ( * naming . InstancesInfo )
return
return
}
}
return
return
}
}
// Close close resolver.
// Close close resolver.
func ( r * Resolver ) Close ( ) error {
func ( r * Resolve ) Close ( ) error {
r . d . mutex . Lock ( )
r . d . mutex . Lock ( )
if app , ok := r . d . apps [ r . id ] ; ok && len ( app . resolver ) != 0 {
if app , ok := r . d . apps [ r . id ] ; ok && len ( app . resolver ) != 0 {
delete ( app . resolver , r )
delete ( app . resolver , r )
@ -298,23 +279,11 @@ func (r *Resolver) Close() error {
return nil
return nil
}
}
func ( d * Discovery ) pickNode ( ) string {
nodes , ok := d . node . Load ( ) . ( [ ] string )
if ! ok || len ( nodes ) == 0 {
return d . conf . Nodes [ d . idx % uint64 ( len ( d . conf . Nodes ) ) ]
}
return nodes [ d . idx % uint64 ( len ( nodes ) ) ]
}
func ( d * Discovery ) switchNode ( ) {
atomic . AddUint64 ( & d . idx , 1 )
}
// Reload reload the config
// Reload reload the config
func ( d * Discovery ) Reload ( c * Config ) {
func ( d * Discovery ) Reload ( c * Config ) {
fixConfig ( c )
fixConfig ( c )
d . mutex . Lock ( )
d . mutex . Lock ( )
d . conf = c
d . c = c
d . mutex . Unlock ( )
d . mutex . Unlock ( )
}
}
@ -325,7 +294,7 @@ func (d *Discovery) Close() error {
}
}
// Register Register an instance with discovery and renew automatically
// Register Register an instance with discovery and renew automatically
func ( d * Discovery ) Register ( c context . Context , ins * naming . Instance ) ( cancelFunc context . CancelFunc , err error ) {
func ( d * Discovery ) Register ( ctx context . Context , ins * naming . Instance ) ( cancelFunc context . CancelFunc , err error ) {
d . mutex . Lock ( )
d . mutex . Lock ( )
if _ , ok := d . registry [ ins . AppID ] ; ok {
if _ , ok := d . registry [ ins . AppID ] ; ok {
err = ErrDuplication
err = ErrDuplication
@ -336,13 +305,15 @@ func (d *Discovery) Register(c context.Context, ins *naming.Instance) (cancelFun
if err != nil {
if err != nil {
return
return
}
}
if err = d . register ( c , ins ) ; err != nil {
ctx , cancel := context . WithCancel ( d . ctx )
if err = d . register ( ctx , ins ) ; err != nil {
d . mutex . Lock ( )
d . mutex . Lock ( )
delete ( d . registry , ins . AppID )
delete ( d . registry , ins . AppID )
d . mutex . Unlock ( )
d . mutex . Unlock ( )
cancel ( )
return
return
}
}
ctx , cancel := context . WithCancel ( d . ctx )
ch := make ( chan struct { } , 1 )
ch := make ( chan struct { } , 1 )
cancelFunc = context . CancelFunc ( func ( ) {
cancelFunc = context . CancelFunc ( func ( ) {
cancel ( )
cancel ( )
@ -355,10 +326,10 @@ func (d *Discovery) Register(c context.Context, ins *naming.Instance) (cancelFun
select {
select {
case <- ticker . C :
case <- ticker . C :
if err := d . renew ( ctx , ins ) ; err != nil && ecode . NothingFound . Equal ( err ) {
if err := d . renew ( ctx , ins ) ; err != nil && ecode . NothingFound . Equal ( err ) {
d . register ( ctx , ins )
_ = d . register ( ctx , ins )
}
}
case <- ctx . Done ( ) :
case <- ctx . Done ( ) :
d . cancel ( ins )
_ = d . cancel ( ins )
ch <- struct { } { }
ch <- struct { } { }
return
return
}
}
@ -367,148 +338,146 @@ func (d *Discovery) Register(c context.Context, ins *naming.Instance) (cancelFun
return
return
}
}
// Set set ins status and metadata.
// register Register an instance with discovery
func ( d * Discovery ) Set ( ins * naming . Instance ) error {
func ( d * Discovery ) register ( ctx context . Context , ins * naming . Instance ) ( err error ) {
return d . set ( context . Background ( ) , ins )
}
// cancel Remove the registered instance from discovery
func ( d * Discovery ) cancel ( ins * naming . Instance ) ( err error ) {
d . mutex . RLock ( )
d . mutex . RLock ( )
conf := d . conf
c := d . c
d . mutex . RUnlock ( )
d . mutex . RUnlock ( )
var metadata [ ] byte
if ins . Metadata != nil {
if metadata , err = json . Marshal ( ins . Metadata ) ; err != nil {
log . Error ( "discovery:register instance Marshal metadata(%v) failed!error(%v)" , ins . Metadata , err )
}
}
res := new ( struct {
res := new ( struct {
Code int ` json:"code" `
Code int ` json:"code" `
Message string ` json:"message" `
Message string ` json:"message" `
} )
} )
uri := fmt . Sprintf ( _cancelURL , d . pickNode ( ) )
uri := fmt . Sprintf ( _register URL , d . pickNode ( ) )
params := d . newParams ( conf )
params := d . newParams ( c )
params . Set ( "appid" , ins . AppID )
params . Set ( "appid" , ins . AppID )
// request
params . Set ( "addrs" , strings . Join ( ins . Addrs , "," ) )
if err = d . httpClient . Post ( context . Background ( ) , uri , "" , params , & res ) ; err != nil {
params . Set ( "version" , ins . Version )
params . Set ( "status" , _statusUP )
params . Set ( "metadata" , string ( metadata ) )
if err = d . httpClient . Post ( ctx , uri , "" , params , & res ) ; err != nil {
d . switchNode ( )
d . switchNode ( )
log . Error ( "discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)" ,
log . Error ( "discovery: register client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v ) error(%v)" ,
uri , conf . Env , ins . AppID , conf . Host , err )
uri , c . Zone , c . Env , ins . AppID , ins . Addrs , err )
return
return
}
}
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
log . Warn ( "discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)" ,
log . Warn ( "discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)" , uri , c . Env , ins . AppID , ins . Addrs , res . Code )
uri , conf . Env , ins . AppID , conf . Host , res . Code )
err = ec
err = ec
return
return
}
}
log . Info ( "discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) success" ,
log . Info ( "discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success" , uri , c . Env , ins . AppID , ins . Addrs )
uri , conf . Env , ins . AppID , conf . Host )
return
return
}
}
// register Register an instance with discovery
// renew Renew an instance with discovery
func ( d * Discovery ) register ( ctx context . Context , ins * naming . Instance ) ( err error ) {
func ( d * Discovery ) renew ( ctx context . Context , ins * naming . Instance ) ( err error ) {
d . mutex . RLock ( )
d . mutex . RLock ( )
conf := d . conf
c := d . c
d . mutex . RUnlock ( )
d . mutex . RUnlock ( )
var metadata [ ] byte
if ins . Metadata != nil {
if metadata , err = json . Marshal ( ins . Metadata ) ; err != nil {
log . Error ( "discovery:register instance Marshal metadata(%v) failed!error(%v)" , ins . Metadata , err )
}
}
res := new ( struct {
res := new ( struct {
Code int ` json:"code" `
Code int ` json:"code" `
Message string ` json:"message" `
Message string ` json:"message" `
} )
} )
uri := fmt . Sprintf ( _register URL , d . pickNode ( ) )
uri := fmt . Sprintf ( _renewURL , d . pickNode ( ) )
params := d . newParams ( conf )
params := d . newParams ( c )
params . Set ( "appid" , ins . AppID )
params . Set ( "appid" , ins . AppID )
params . Set ( "addrs" , strings . Join ( ins . Addrs , "," ) )
params . Set ( "version" , ins . Version )
params . Set ( "status" , _statusUP )
params . Set ( "metadata" , string ( metadata ) )
if err = d . httpClient . Post ( ctx , uri , "" , params , & res ) ; err != nil {
if err = d . httpClient . Post ( ctx , uri , "" , params , & res ) ; err != nil {
d . switchNode ( )
d . switchNode ( )
log . Error ( "discovery: register client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v ) error(%v)" ,
log . Error ( "discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)" ,
uri , conf . Zone , conf . Env , ins . AppID , ins . Addrs , err )
uri , c . Env , ins . AppID , c . Host , err )
return
return
}
}
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
log . Warn ( "discovery: register client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)" ,
uri , conf . Env , ins . AppID , ins . Addrs , res . Code )
err = ec
err = ec
if ec . Equal ( ecode . NothingFound ) {
return
}
log . Error ( "discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)" ,
uri , c . Env , ins . AppID , c . Host , res . Code )
return
return
}
}
log . Info ( "discovery: register client.Get(%v) env(%s) appid(%s) addrs(%s) success" ,
uri , conf . Env , ins . AppID , ins . Addrs )
return
return
}
}
// rset set instance info with discovery
// cancel Remove the registered instance from discovery
func ( d * Discovery ) set ( ctx context . Context , ins * naming . Instance ) ( err error ) {
func ( d * Discovery ) cancel ( ins * naming . Instance ) ( err error ) {
d . mutex . RLock ( )
d . mutex . RLock ( )
conf := d . conf
c := d . c
d . mutex . RUnlock ( )
d . mutex . RUnlock ( )
res := new ( struct {
res := new ( struct {
Code int ` json:"code" `
Code int ` json:"code" `
Message string ` json:"message" `
Message string ` json:"message" `
} )
} )
uri := fmt . Sprintf ( _set URL , d . pickNode ( ) )
uri := fmt . Sprintf ( _cancel URL , d . pickNode ( ) )
params := d . newParams ( conf )
params := d . newParams ( c )
params . Set ( "appid" , ins . AppID )
params . Set ( "appid" , ins . AppID )
params . Set ( "version" , ins . Version )
// request
params . Set ( "status" , strconv . FormatInt ( ins . Status , 10 ) )
if err = d . httpClient . Post ( context . TODO ( ) , uri , "" , params , & res ) ; err != nil {
if ins . Metadata != nil {
var metadata [ ] byte
if metadata , err = json . Marshal ( ins . Metadata ) ; err != nil {
log . Error ( "discovery:set instance Marshal metadata(%v) failed!error(%v)" , ins . Metadata , err )
}
params . Set ( "metadata" , string ( metadata ) )
}
if err = d . httpClient . Post ( ctx , uri , "" , params , & res ) ; err != nil {
d . switchNode ( )
d . switchNode ( )
log . Error ( "discovery: set client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v ) error(%v)" ,
log . Error ( "discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)" ,
uri , conf . Zone , conf . Env , ins . AppID , ins . Addrs , err )
uri , c . Env , ins . AppID , c . Host , err )
return
return
}
}
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
log . Warn ( "discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)" ,
log . Warn ( "discovery cancel client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)" ,
uri , conf . Env , ins . AppID , ins . Addrs , res . Code )
uri , c . Env , ins . AppID , c . Host , res . Code )
err = ec
err = ec
return
return
}
}
log . Info ( "discovery: set client.Get(%v) env(%s) appid(%s) addrs (%s) success" ,
log . Info ( "discovery cancel client.Get(%v) env(%s) appid(%s) hostname (%s) success" ,
uri + "?" + params . Encode ( ) , conf . Env , ins . AppID , ins . Addrs )
uri , c . Env , ins . AppID , c . Host )
return
return
}
}
// renew Renew an instance with discovery
// Set set ins status and metadata.
func ( d * Discovery ) renew ( ctx context . Context , ins * naming . Instance ) ( err error ) {
func ( d * Discovery ) Set ( ins * naming . Instance ) error {
return d . set ( context . Background ( ) , ins )
}
// set set instance info with discovery
func ( d * Discovery ) set ( ctx context . Context , ins * naming . Instance ) ( err error ) {
d . mutex . RLock ( )
d . mutex . RLock ( )
conf := d . conf
conf := d . c
d . mutex . RUnlock ( )
d . mutex . RUnlock ( )
res := new ( struct {
res := new ( struct {
Code int ` json:"code" `
Code int ` json:"code" `
Message string ` json:"message" `
Message string ` json:"message" `
} )
} )
uri := fmt . Sprintf ( _renew URL , d . pickNode ( ) )
uri := fmt . Sprintf ( _set URL , d . pickNode ( ) )
params := d . newParams ( conf )
params := d . newParams ( conf )
params . Set ( "appid" , ins . AppID )
params . Set ( "appid" , ins . AppID )
params . Set ( "version" , ins . Version )
params . Set ( "status" , _statusUP )
if ins . Metadata != nil {
var metadata [ ] byte
if metadata , err = json . Marshal ( ins . Metadata ) ; err != nil {
log . Error ( "discovery:set instance Marshal metadata(%v) failed!error(%v)" , ins . Metadata , err )
return
}
params . Set ( "metadata" , string ( metadata ) )
}
if err = d . httpClient . Post ( ctx , uri , "" , params , & res ) ; err != nil {
if err = d . httpClient . Post ( ctx , uri , "" , params , & res ) ; err != nil {
d . switchNode ( )
d . switchNode ( )
log . Error ( "discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) error(%v)" ,
log . Error ( "discovery: set client.Get(%v) zone(%s) env(%s) appid(%s) addrs(%v ) error(%v)" ,
uri , conf . Env , ins . AppID , conf . Host , err )
uri , conf . Zone , conf . Env , ins . AppID , ins . Addrs , err )
return
return
}
}
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
log . Warn ( "discovery: set client.Get(%v) env(%s) appid(%s) addrs(%v) code(%v)" ,
uri , conf . Env , ins . AppID , ins . Addrs , res . Code )
err = ec
err = ec
if ec . Equal ( ecode . NothingFound ) {
return
}
log . Error ( "discovery: renew client.Get(%v) env(%s) appid(%s) hostname(%s) code(%v)" ,
uri , conf . Env , ins . AppID , conf . Host , res . Code )
return
return
}
}
log . Info ( "discovery: set client.Get(%v) env(%s) appid(%s) addrs(%s) success" , uri + "?" + params . Encode ( ) , conf . Env , ins . AppID , ins . Addrs )
return
return
}
}
@ -518,7 +487,6 @@ func (d *Discovery) serverproc() {
ctx context . Context
ctx context . Context
cancel context . CancelFunc
cancel context . CancelFunc
)
)
bc := netutil . DefaultBackoffConfig
ticker := time . NewTicker ( time . Minute * 30 )
ticker := time . NewTicker ( time . Minute * 30 )
defer ticker . Stop ( )
defer ticker . Stop ( )
for {
for {
@ -531,16 +499,17 @@ func (d *Discovery) serverproc() {
select {
select {
case <- d . ctx . Done ( ) :
case <- d . ctx . Done ( ) :
return
return
case <- ticker . C :
default :
default :
}
}
apps , err := d . polls ( ctx , d . pickNode ( ) )
apps , err := d . polls ( ctx )
if err != nil {
if err != nil {
d . switchNode ( )
d . switchNode ( )
if ctx . Err ( ) == context . Canceled {
if ctx . Err ( ) == context . Canceled {
ctx = nil
ctx = nil
continue
continue
}
}
time . Sleep ( bc . Backoff ( retry ) )
time . Sleep ( time . Second )
retry ++
retry ++
continue
continue
}
}
@ -549,38 +518,23 @@ func (d *Discovery) serverproc() {
}
}
}
}
func ( d * Discovery ) nodes ( ) ( nodes [ ] string ) {
func ( d * Discovery ) pickNode ( ) string {
res := new ( struct {
nodes , ok := d . node . Load ( ) . ( [ ] string )
Code int ` json:"code" `
if ! ok || len ( nodes ) == 0 {
Data [ ] struct {
return d . c . Nodes [ rand . Intn ( len ( d . c . Nodes ) ) ]
Addr string ` json:"addr" `
} ` json:"data" `
} )
uri := fmt . Sprintf ( _nodesURL , d . pickNode ( ) )
if err := d . httpClient . Get ( d . ctx , uri , "" , nil , res ) ; err != nil {
d . switchNode ( )
log . Error ( "discovery: consumer client.Get(%v)error(%+v)" , uri , err )
return
}
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
log . Error ( "discovery: consumer client.Get(%v) error(%v)" , uri , res . Code )
return
}
if len ( res . Data ) == 0 {
log . Warn ( "discovery: get nodes(%s) failed,no nodes found!" , uri )
return
}
}
nodes = make ( [ ] string , 0 , len ( res . Data ) )
return nodes [ atomic . LoadUint64 ( & d . nodeIdx ) % uint64 ( len ( nodes ) ) ]
for i := range res . Data {
nodes = append ( nodes , res . Data [ i ] . Addr )
}
}
return
func ( d * Discovery ) switchNode ( ) {
atomic . AddUint64 ( & d . nodeIdx , 1 )
}
}
func ( d * Discovery ) polls ( ctx context . Context , host string ) ( apps map [ string ] appData , err error ) {
func ( d * Discovery ) polls ( ctx context . Context ) ( apps map [ string ] * naming . InstancesInfo , err error ) {
var (
var (
lastTs [ ] int64
lastTss [ ] int64
appid [ ] string
appIDs [ ] string
host = d . pickNode ( )
changed bool
changed bool
)
)
if host != d . lastHost {
if host != d . lastHost {
@ -588,46 +542,41 @@ func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]app
changed = true
changed = true
}
}
d . mutex . RLock ( )
d . mutex . RLock ( )
conf := d . conf
c := d . c
for k , v := range d . apps {
for k , v := range d . apps {
if changed {
if changed {
v . lastTs = 0
v . lastTs = 0
}
}
appid = append ( appid , k )
appIDs = append ( appIDs , k )
lastTs = append ( lastTs , v . lastTs )
lastTss = append ( lastTs s , v . lastTs )
}
}
d . mutex . RUnlock ( )
d . mutex . RUnlock ( )
if len ( appid ) == 0 {
if len ( appIDs ) == 0 {
return
return
}
}
uri := fmt . Sprintf ( _pollURL , host )
uri := fmt . Sprintf ( _pollURL , host )
res := new ( struct {
res := new ( struct {
Code int ` json:"code" `
Code int ` json:"code" `
Message string ` json:"message" `
Data map [ string ] * naming . InstancesInfo ` json:"data" `
Data map [ string ] appData ` json:"data" `
} )
} )
params := url . Values { }
params := url . Values { }
params . Set ( "env" , conf . Env )
params . Set ( "env" , c . Env )
params . Set ( "hostname" , conf . Host )
params . Set ( "hostname" , c . Host )
params . Set ( "appid" , strings . Join ( appid , "," ) )
for _ , appid := range appIDs {
params . Set ( "latest_timestamp" , xstr . JoinInts ( lastTs ) )
params . Add ( "appid" , appid )
}
for _ , ts := range lastTss {
params . Add ( "latest_timestamp" , strconv . FormatInt ( ts , 10 ) )
}
if err = d . httpClient . Get ( ctx , uri , "" , params , res ) ; err != nil {
if err = d . httpClient . Get ( ctx , uri , "" , params , res ) ; err != nil {
d . switchNode ( )
log . Error ( "discovery: client.Get(%s) error(%+v)" , uri + "?" + params . Encode ( ) , err )
log . Error ( "discovery: client.Get(%s) error(%+v)" , uri + "?" + params . Encode ( ) , err )
return
return
}
}
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
if ec := ecode . Int ( res . Code ) ; ! ec . Equal ( ecode . OK ) {
if ! ec . Equal ( ecode . NotModified ) {
if ! ec . Equal ( ecode . NotModified ) {
log . Error ( "discovery: client.Get(%s) get error code(%d) message(%s) " , uri + "?" + params . Encode ( ) , res . Code , res . Messag e )
log . Error ( "discovery: client.Get(%s) get error code(%d)" , uri + "?" + params . Encode ( ) , res . Code )
err = ec
err = ec
if ec . Equal ( ecode . NothingFound ) {
for appID , value := range res . Data {
if value . Err != "" {
errInfo := fmt . Sprintf ( "discovery: app(%s) on ENV(%s) %s!\n" , appID , conf . Env , value . Err )
log . Error ( errInfo )
fmt . Fprintf ( os . Stderr , errInfo )
}
}
}
}
}
return
return
}
}
@ -639,18 +588,17 @@ func (d *Discovery) polls(ctx context.Context, host string) (apps map[string]app
return
return
}
}
}
}
log . Info ( "discovery: polls uri(%s)" , uri + "?" + params . Encode ( ) )
log . Info ( "discovery: successfully polls(%s) instances (%s)" , uri + "?" + params . Encode ( ) , info )
log . Info ( "discovery: successfully polls(%s) instances (%s)" , uri + "?" + params . Encode ( ) , info )
apps = res . Data
apps = res . Data
return
return
}
}
func ( d * Discovery ) broadcast ( apps map [ string ] appData ) {
func ( d * Discovery ) broadcast ( apps map [ string ] * naming . InstancesInfo ) {
for id , v := range apps {
for appID , v := range apps {
var count int
var count int
for zone , ins := range v . Zone Instances {
for zone , ins := range v . Instances {
if len ( ins ) == 0 {
if len ( ins ) == 0 {
delete ( v . Zone Instances, zone )
delete ( v . Instances , zone )
}
}
count += len ( ins )
count += len ( ins )
}
}
@ -658,11 +606,11 @@ func (d *Discovery) broadcast(apps map[string]appData) {
continue
continue
}
}
d . mutex . RLock ( )
d . mutex . RLock ( )
app , ok := d . apps [ id ]
app , ok := d . apps [ appID ]
d . mutex . RUnlock ( )
d . mutex . RUnlock ( )
if ok {
if ok {
app . lastTs = v . LastTs
app . lastTs = v . LastTs
app . zoneIns . Store ( v . ZoneInstances )
app . zoneIns . Store ( v )
d . mutex . RLock ( )
d . mutex . RLock ( )
for rs := range app . resolver {
for rs := range app . resolver {
select {
select {
@ -675,10 +623,38 @@ func (d *Discovery) broadcast(apps map[string]appData) {
}
}
}
}
func ( d * Discovery ) newParams ( conf * Config ) url . Values {
func ( d * Discovery ) newParams ( c * Config ) url . Values {
params := url . Values { }
params := url . Values { }
params . Set ( "zone" , conf . Zone )
params . Set ( "region" , c . Region )
params . Set ( "env" , conf . Env )
params . Set ( "zone" , c . Zone )
params . Set ( "hostname" , conf . Host )
params . Set ( "env" , c . Env )
params . Set ( "hostname" , c . Host )
return params
return params
}
}
var r = rand . New ( rand . NewSource ( time . Now ( ) . UnixNano ( ) ) )
// shuffle pseudo-randomizes the order of elements.
// n is the number of elements. Shuffle panics if n < 0.
// swap swaps the elements with indexes i and j.
func shuffle ( n int , swap func ( i , j int ) ) {
if n < 0 {
panic ( "invalid argument to Shuffle" )
}
// Fisher-Yates shuffle: https://en.wikipedia.org/wiki/Fisher%E2%80%93Yates_shuffle
// Shuffle really ought not be called with n that doesn't fit in 32 bits.
// Not only will it take a very long time, but with 2³¹! possible permutations,
// there's no way that any PRNG can have a big enough internal state to
// generate even a minuscule percentage of the possible permutations.
// Nevertheless, the right API signature accepts an int n, so handle it as best we can.
i := n - 1
for ; i > 1 << 31 - 1 - 1 ; i -- {
j := int ( r . Int63n ( int64 ( i + 1 ) ) )
swap ( i , j )
}
for ; i > 0 ; i -- {
j := int ( r . Int31n ( int32 ( i + 1 ) ) )
swap ( i , j )
}
}