@ -19,9 +19,9 @@ import (
)
)
var (
var (
//Prefix is a etcd globe key prefix
//etcd Prefix is a etcd globe key prefix
Endpoints string
endpoints string
Prefix string
etcd Prefix string
RegisterTTL = 30
RegisterTTL = 30
)
)
@ -39,8 +39,8 @@ func init() {
func addFlag ( fs * flag . FlagSet ) {
func addFlag ( fs * flag . FlagSet ) {
// env
// env
fs . StringVar ( & E ndpoints, "etcd.endpoints" , os . Getenv ( "ETCD_ENDPOINTS" ) , "etcd.endpoints is etcd endpoints. value: 127.0.0.1:2379,127.0.0.2:2379 etc." )
fs . StringVar ( & e ndpoints, "etcd.endpoints" , os . Getenv ( "ETCD_ENDPOINTS" ) , "etcd.endpoints is etcd endpoints. value: 127.0.0.1:2379,127.0.0.2:2379 etc." )
fs . StringVar ( & Prefix , "etcd.prefix" , defaultString ( "ETCD_PREFIX" , "kratos_etcd" ) , "etcd globe key prefix or use ETCD_PREFIX env variable. value etcd_prefix etc." )
fs . StringVar ( & etcd Prefix, "etcd.prefix" , defaultString ( "ETCD_PREFIX" , "kratos_etcd" ) , "etcd globe key prefix or use ETCD_PREFIX env variable. value etcd_prefix etc." )
}
}
func defaultString ( env , value string ) string {
func defaultString ( env , value string ) string {
@ -91,11 +91,11 @@ type Resolve struct {
// New is new a etcdbuilder
// New is new a etcdbuilder
func New ( c * clientv3 . Config ) ( e * EtcdBuilder , err error ) {
func New ( c * clientv3 . Config ) ( e * EtcdBuilder , err error ) {
if c == nil {
if c == nil {
if E ndpoints == "" {
if e ndpoints == "" {
panic ( fmt . Errorf ( "invalid etcd config endpoints:%+v" , E ndpoints) )
panic ( fmt . Errorf ( "invalid etcd config endpoints:%+v" , e ndpoints) )
}
}
c = & clientv3 . Config {
c = & clientv3 . Config {
Endpoints : strings . Split ( E ndpoints, "," ) ,
Endpoints : strings . Split ( e ndpoints, "," ) ,
DialTimeout : time . Second * 30 ,
DialTimeout : time . Second * 30 ,
}
}
}
}
@ -227,7 +227,7 @@ func (e *EtcdBuilder) unregister(ins *naming.Instance) (err error) {
}
}
func ( e * EtcdBuilder ) keyPrefix ( ins * naming . Instance ) string {
func ( e * EtcdBuilder ) keyPrefix ( ins * naming . Instance ) string {
return fmt . Sprintf ( "/%s/%s/%s" , Prefix , ins . AppID , ins . Hostname )
return fmt . Sprintf ( "/%s/%s/%s" , etcd Prefix, ins . AppID , ins . Hostname )
}
}
// Close stop all running process including etcdfetch and register
// Close stop all running process including etcdfetch and register
@ -237,7 +237,7 @@ func (e *EtcdBuilder) Close() error {
}
}
func ( a * appInfo ) watch ( appID string ) {
func ( a * appInfo ) watch ( appID string ) {
_ = a . fetchstore ( appID )
_ = a . fetchstore ( appID )
prefix := fmt . Sprintf ( "/%s/%s/" , Prefix , appID )
prefix := fmt . Sprintf ( "/%s/%s/" , etcd Prefix, appID )
rch := a . e . cli . Watch ( a . e . ctx , prefix , clientv3 . WithPrefix ( ) )
rch := a . e . cli . Watch ( a . e . ctx , prefix , clientv3 . WithPrefix ( ) )
for wresp := range rch {
for wresp := range rch {
for _ , ev := range wresp . Events {
for _ , ev := range wresp . Events {
@ -249,7 +249,7 @@ func (a *appInfo) watch(appID string) {
}
}
func ( a * appInfo ) fetchstore ( appID string ) ( err error ) {
func ( a * appInfo ) fetchstore ( appID string ) ( err error ) {
prefix := fmt . Sprintf ( "/%s/%s/" , Prefix , appID )
prefix := fmt . Sprintf ( "/%s/%s/" , etcd Prefix, appID )
resp , err := a . e . cli . Get ( a . e . ctx , prefix , clientv3 . WithPrefix ( ) )
resp , err := a . e . cli . Get ( a . e . ctx , prefix , clientv3 . WithPrefix ( ) )
if err != nil {
if err != nil {
log . Error ( "etcd: fetch client.Get(%s) error(%+v)" , prefix , err )
log . Error ( "etcd: fetch client.Get(%s) error(%+v)" , prefix , err )