diff --git a/pkg/naming/etcd/etcd.go b/pkg/naming/etcd/etcd.go index 044f5fc33..f64f107f4 100644 --- a/pkg/naming/etcd/etcd.go +++ b/pkg/naming/etcd/etcd.go @@ -4,7 +4,10 @@ import ( "context" "encoding/json" "errors" + "flag" "fmt" + "os" + "strings" "sync" "sync/atomic" "time" @@ -15,9 +18,10 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" ) -const ( +var ( //Prefix is a etcd globe key prefix - Prefix = "kratos_etcd3" + Endpoints string + Prefix string RegisterTTL = 30 ) @@ -29,10 +33,28 @@ var ( ErrDuplication = errors.New("etcd: instance duplicate registration") ) +func init() { + addFlag(flag.CommandLine) +} + +func addFlag(fs *flag.FlagSet) { + // env + fs.StringVar(&Endpoints, "etcd.endpoints", os.Getenv("ETCD_ENDPOINTS"), "etcd.endpoints is etcd endpoints. value: 127.0.0.1:2379,127.0.0.2:2379 etc.") + fs.StringVar(&Prefix, "etcd.prefix", defaultString("ETCD_PREFIX", "kratos_etcd"), "etcd globe key prefix or use ETCD_PREFIX env variable. value etcd_prefix etc.") +} + +func defaultString(env, value string) string { + v := os.Getenv(env) + if v == "" { + return value + } + return v +} + // Builder return default etcd resolver builder. func Builder(c *clientv3.Config) naming.Builder { _once.Do(func() { - _builder,_ = New(c) + _builder, _ = New(c) }) return _builder } @@ -42,7 +64,7 @@ func Build(c *clientv3.Config, id string) naming.Resolver { return Builder(c).Build(id) } -//EtcdBuilder is a etcd clientv3 EtcdBuilder +// EtcdBuilder is a etcd clientv3 EtcdBuilder type EtcdBuilder struct { cli *clientv3.Client ctx context.Context @@ -65,11 +87,21 @@ type Resolve struct { event chan struct{} e *EtcdBuilder } -//New is new a etcdbuilder -func New(c *clientv3.Config) (e *EtcdBuilder,err error) { + +// New is new a etcdbuilder +func New(c *clientv3.Config) (e *EtcdBuilder, err error) { + if c == nil { + if Endpoints == "" { + panic(fmt.Errorf("invalid etcd config endpoints:%+v", Endpoints)) + } + c = &clientv3.Config{ + Endpoints: strings.Split(Endpoints, ","), + DialTimeout: time.Second * 30, + } + } cli, err := clientv3.New(*c) if err != nil { - return nil,err + return nil, err } ctx, cancel := context.WithCancel(context.Background()) e = &EtcdBuilder{ @@ -120,7 +152,7 @@ func (e *EtcdBuilder) Scheme() string { } -//Register is register instance +// Register is register instance func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { e.mutex.Lock() if _, ok := e.registry[ins.AppID]; ok { @@ -148,7 +180,7 @@ func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cance go func() { //提前2秒续约 避免续约操作缓慢时租约过期 - ticker := time.NewTicker((RegisterTTL - 2) * time.Second) + ticker := time.NewTicker(time.Duration(RegisterTTL-2) * time.Second) defer ticker.Stop() for { select { @@ -166,7 +198,7 @@ func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cance //注册和续约公用一个操作 func (e *EtcdBuilder) register(ctx context.Context, ins *naming.Instance) (err error) { - prefix := e.getprefix(ins) + prefix := e.keyPrefix(ins) val, _ := json.Marshal(ins) ttlResp, err := e.cli.Grant(context.TODO(), int64(RegisterTTL)) @@ -183,7 +215,7 @@ func (e *EtcdBuilder) register(ctx context.Context, ins *naming.Instance) (err e return nil } func (e *EtcdBuilder) unregister(ins *naming.Instance) (err error) { - prefix := e.getprefix(ins) + prefix := e.keyPrefix(ins) if _, err = e.cli.Delete(context.TODO(), prefix); err != nil { log.Error("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) error(%v)", @@ -194,7 +226,7 @@ func (e *EtcdBuilder) unregister(ins *naming.Instance) (err error) { return } -func (e *EtcdBuilder) getprefix(ins *naming.Instance) string { +func (e *EtcdBuilder) keyPrefix(ins *naming.Instance) string { return fmt.Sprintf("/%s/%s/%s", Prefix, ins.AppID, ins.Hostname) } diff --git a/pkg/naming/etcd/etcd_test.go b/pkg/naming/etcd/etcd_test.go index 7d8e8f7e4..85cbfe69b 100644 --- a/pkg/naming/etcd/etcd_test.go +++ b/pkg/naming/etcd/etcd_test.go @@ -12,12 +12,12 @@ import ( func TestNew(t *testing.T) { config := &clientv3.Config{ - Endpoints: []string{"127.0.0.1:2379"}, - DialTimeout:time.Second*3, + Endpoints: []string{"127.0.0.1:2379"}, + DialTimeout: time.Second * 3, } - builder,err := New(config) + builder, err := New(config) - if(err != nil){ + if err != nil { fmt.Println("etcd 连接失败") return } @@ -67,7 +67,7 @@ func TestNew(t *testing.T) { fmt.Printf("app: %s host %s \n", in.AppID, in.Hostname) } } - }else{ + } else { fmt.Printf("\n") } fmt.Printf("app2: ") @@ -79,7 +79,7 @@ func TestNew(t *testing.T) { fmt.Printf("app: %s host %s \n", in.AppID, in.Hostname) } } - }else{ + } else { fmt.Printf("\n") } time.Sleep(time.Second) @@ -94,5 +94,5 @@ func TestNew(t *testing.T) { fmt.Println("取消app2") app2Cancel() - time.Sleep(30*time.Second) + time.Sleep(30 * time.Second) }