From 272f26a26b6ea98c59aa4b10b4729d4f3031c0e6 Mon Sep 17 00:00:00 2001 From: "libi.chai" Date: Thu, 18 Jul 2019 17:32:34 +0800 Subject: [PATCH 1/5] add etcd naming --- go.mod | 2 +- go.sum | 12 ++ pkg/naming/etcd/etcd.go | 284 +++++++++++++++++++++++++++++++++++ pkg/naming/etcd/etcd_test.go | 93 ++++++++++++ 4 files changed, 390 insertions(+), 1 deletion(-) create mode 100644 pkg/naming/etcd/etcd.go create mode 100644 pkg/naming/etcd/etcd_test.go diff --git a/go.mod b/go.mod index d21eb3e15..d0827055a 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( cloud.google.com/go v0.41.0 // indirect github.com/BurntSushi/toml v0.3.1 github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef // indirect + github.com/coreos/etcd v3.3.13+incompatible github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d // indirect github.com/cznic/mathutil v0.0.0-20181122101859-297441e03548 // indirect github.com/cznic/strutil v0.0.0-20181122101858-275e90344537 // indirect @@ -51,7 +52,6 @@ require ( gopkg.in/go-playground/validator.v9 v9.26.0 gopkg.in/yaml.v2 v2.2.2 honnef.co/go/tools v0.0.0-20190605142022-0a11fc526260 // indirect - ) replace ( diff --git a/go.sum b/go.sum index 72b08b006..ea63e9cb8 100644 --- a/go.sum +++ b/go.sum @@ -2,11 +2,16 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= +github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d h1:G0m3OIz70MZUWq3EgK3CesDbo8upS2Vm9/P3FtgI+Jk= +github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef h1:ajsnF5qTstiBlP+V/mgh91zZfoKP477KfSmRoCoyYGU= github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= +github.com/coreos/etcd v3.3.13+incompatible h1:8F3hqu9fGYLBifCmRCJsicFqDx/D68Rt3q1JMazcgBQ= +github.com/coreos/etcd v3.3.13+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY= github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d h1:SwD98825d6bdB+pEuTxWOXiSjBrHdOl/UVp75eI7JT8= github.com/cznic/b v0.0.0-20181122101859-a26611c4d92d/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8= @@ -23,10 +28,13 @@ github.com/dgryski/go-farm v0.0.0-20190323231341-8198c7b169ec/go.mod h1:SqUrOPUn github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= +github.com/etcd-io/etcd v3.3.10+incompatible/go.mod h1:cdZ77EstHBwVtD6iTgzgvogwcjo9m4iOqoijouPJ4bs= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= +github.com/go-ole/go-ole v1.2.4 h1:nNBDSCOigTSiarFpYE9J/KtEA1IOW4CNeqT9TQDqCxI= +github.com/go-ole/go-ole v1.2.4/go.mod h1:XCwSNxSkXRo4vlyPy93sltvi/qJq0jqQhjqQNIwKuxM= github.com/go-playground/locales v0.12.1 h1:2FITxuFt/xuCNP1Acdhv62OzaCiviiE4kotfhkmOqEc= github.com/go-playground/locales v0.12.1/go.mod h1:IUMDtCfWo/w/mtMfIE/IG2K+Ey3ygWanZIBtBW0W2TM= github.com/go-playground/universal-translator v0.16.0 h1:X++omBR/4cE2MNg91AoC3rmGrCjJ8eAeUP/K/EKx4DM= @@ -44,6 +52,7 @@ github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfU github.com/golang/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= github.com/golang/mock v1.1.1 h1:G5FRp8JnTd7RQH5kemVNlMeyXQAztQ3mOWV95KxsXH8= github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s= github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y= github.com/golang/net v0.0.0-20190311183353-d8887717615a h1:4V+LPwzBFLRg7XSXZw133Jsur1mTVMY73hIv/FTdrbg= github.com/golang/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= @@ -75,9 +84,11 @@ github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpO github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/pty v1.1.8/go.mod h1:O1sed60cT9XZ5uDucP5qwvh+TE3NnUj51EiZO/lmSfw= +github.com/kr/text v0.1.0 h1:45sCR5RtlFHMR4UwH9sdQ5TC8v0qDQCHnXt+kaKSTVE= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/leodido/go-urn v1.1.0 h1:Sm1gr51B1kKyfD2BlRcLSiEkffoG96g6TPv6eRoEiB8= github.com/leodido/go-urn v1.1.0/go.mod h1:+cyI34gQWZcE1eQU7NVgKkkzdXDQHr1dBMtdAPozLkw= @@ -134,6 +145,7 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= diff --git a/pkg/naming/etcd/etcd.go b/pkg/naming/etcd/etcd.go new file mode 100644 index 000000000..22f0b9de8 --- /dev/null +++ b/pkg/naming/etcd/etcd.go @@ -0,0 +1,284 @@ +package etcd + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "time" + "sync" + "sync/atomic" + + "github.com/bilibili/kratos/pkg/log" + "github.com/bilibili/kratos/pkg/naming" + "github.com/coreos/etcd/clientv3" + "github.com/coreos/etcd/mvcc/mvccpb" +) +const ( + Prefix = "kratos_etcd3" + + RegisterTTL = 30 +) +var ( + _once sync.Once + _builder naming.Builder + + ErrDuplication = errors.New("etcd: instance duplicate registration") +) + +// Builder return default etcd resolver builder. +func Builder(c *clientv3.Config) naming.Builder { + _once.Do(func() { + _builder = New(c) + }) + return _builder +} +// Build register resolver into default etcd. +func Build(c *clientv3.Config,id string) naming.Resolver { + return Builder(c).Build(id) +} +//Etcd is a etcd clientv3 builder +type EtcdBuilder struct { + cli *clientv3.Client + ctx context.Context + cancelFunc context.CancelFunc + + mutex sync.RWMutex + apps map[string]*appInfo + registry map[string]struct{} +} +type appInfo struct { + resolver map[*Resolve]struct{} + ins atomic.Value + e *EtcdBuilder + once sync.Once +} +// Resolve etch resolver. +type Resolve struct { + id string + event chan struct{} + e *EtcdBuilder + +} + +func New(c *clientv3.Config) (e *EtcdBuilder){ + cli,err := clientv3.New(*c) + if(err != nil){ + panic(err) + } + ctx, cancel := context.WithCancel(context.Background()) + e = &EtcdBuilder{ + cli: cli, + ctx: ctx, + cancelFunc: cancel, + apps: map[string]*appInfo{}, + registry: map[string]struct{}{}, + delete: make(chan *appInfo, 10), + } + return +} + +// Build disovery resovler builder. +func (e *EtcdBuilder) Build(appid string) naming.Resolver { + r := &Resolve{ + id: appid, + e: e, + event: make(chan struct{}, 1), + } + e.mutex.Lock() + app, ok := e.apps[appid] + if !ok { + app = &appInfo{ + resolver: make(map[*Resolve]struct{}), + e:e, + } + e.apps[appid] = app + } + app.resolver[r] = struct{}{} + e.mutex.Unlock() + if ok { + select { + case r.event <- struct{}{}: + default: + } + } + + app.once.Do(func(){ + go app.watch(appid) + log.Info("etcd: AddWatch(%s) already watch(%v)", appid, ok) + }) + return r +} + +// Scheme return etcd's scheme +func (e *EtcdBuilder) Scheme() string { + return "etcd" + +} + +// +func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { + e.mutex.Lock() + if _, ok := e.registry[ins.AppID]; ok { + err = ErrDuplication + } else { + e.registry[ins.AppID] = struct{}{} + } + e.mutex.Unlock() + if err != nil { + return + } + + ctx, cancel := context.WithCancel(e.ctx) + if err = e.register(ctx, ins); err != nil { + e.mutex.Lock() + delete(e.registry, ins.AppID) + e.mutex.Unlock() + cancel() + return + } + ch := make(chan struct{}, 1) + cancelFunc = context.CancelFunc(func() { + cancel() + <-ch + }) + + go func() { + //提前2秒续约 避免续约操作缓慢时租约过期 + ticker := time.NewTicker((RegisterTTL-2)*time.Second) + defer ticker.Stop() + for { + select { + case <-ticker.C: + _ = e.register(ctx, ins) + case <-ctx.Done(): + _ = e.unregister(ins) + ch <- struct{}{} + return + } + } + }() + return +} +//注册和续约公用一个操作 +func (e *EtcdBuilder)register(ctx context.Context, ins *naming.Instance) (err error){ + prefix := e.getprefix(ins) + val ,_ := json.Marshal(ins) + + ttlResp, err := e.cli.Grant(context.TODO(), int64(RegisterTTL)) + if(err != nil){ + log.Error("etcd: register client.Grant(%v) error(%v)",RegisterTTL,err) + return err + } + _,err = e.cli.Put(ctx,prefix,string(val),clientv3.WithLease(ttlResp.ID)) + if(err != nil){ + log.Error("etcd: register client.Put(%v) appid(%s) hostname(%s) error(%v)", + prefix,ins.AppID,ins.Hostname, err) + return err + } + return nil +} +func (e *EtcdBuilder)unregister(ins *naming.Instance) (err error){ + prefix := e.getprefix(ins) + + if _,err = e.cli.Delete(context.TODO(),prefix); err != nil{ + log.Error("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) error(%v)", + prefix, ins.AppID, ins.Hostname, err) + } + log.Info("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) success", + prefix, ins.AppID, ins.Hostname) + return +} + + +func (e *EtcdBuilder)getprefix(ins *naming.Instance) string{ + return fmt.Sprintf("/%s/%s/%s", Prefix, ins.AppID,ins.Hostname) +} +// Close stop all running process including etcdfetch and register +func (e *EtcdBuilder) Close() error { + e.cancelFunc() + return nil +} +func (a *appInfo)watch(appID string){ + _ = a.fetchstore(appID) + prefix := fmt.Sprintf("/%s/%s/",Prefix,appID) + rch := a.e.cli.Watch(a.e.ctx, prefix, clientv3.WithPrefix()) + for wresp := range rch { + for _, ev := range wresp.Events { + if(ev.Type == mvccpb.PUT || ev.Type == mvccpb.DELETE){ + _ = a.fetchstore(appID) + } + } + } +} + +func (a *appInfo)fetchstore(appID string)(err error){ + prefix := fmt.Sprintf("/%s/%s/", Prefix, appID) + resp ,err := a.e.cli.Get(a.e.ctx,prefix,clientv3.WithPrefix()) + if(err != nil){ + log.Error("etcd: fetch client.Get(%s) error(%+v)", prefix, err) + return err + } + + ins,err := a.paserIns(resp) + if(err != nil){ + return err + } + a.store(ins) + return nil +} +func (a *appInfo)store(ins *naming.InstancesInfo){ + + a.ins.Store(ins) + a.e.mutex.RLock() + for rs := range a.resolver { + select { + case rs.event <- struct{}{}: + default: + } + } + a.e.mutex.RUnlock() +} + + +func (a *appInfo)paserIns(resp *clientv3.GetResponse)(ins *naming.InstancesInfo,err error){ + ins = &naming.InstancesInfo{ + Instances:make(map[string][]*naming.Instance,0), + } + for _, ev := range resp.Kvs { + in := new(naming.Instance) + + err := json.Unmarshal(ev.Value,in) + if(err != nil){ + return nil,err + } + ins.Instances[in.Zone] = append(ins.Instances[in.Zone],in) + } + return ins,nil +} +// Watch watch instance. +func (r *Resolve) Watch() <-chan struct{} { + return r.event +} + +// Fetch fetch resolver instance. +func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) { + r.e.mutex.RLock() + app, ok := r.e.apps[r.id] + r.e.mutex.RUnlock() + if ok { + ins, ok = app.ins.Load().(*naming.InstancesInfo) + return + } + return +} + +// Close close resolver. +func (r *Resolve) Close() error { + r.e.mutex.Lock() + if app, ok := r.e.apps[r.id]; ok && len(app.resolver) != 0 { + delete(app.resolver, r) + } + r.e.mutex.Unlock() + return nil +} diff --git a/pkg/naming/etcd/etcd_test.go b/pkg/naming/etcd/etcd_test.go new file mode 100644 index 000000000..f02831971 --- /dev/null +++ b/pkg/naming/etcd/etcd_test.go @@ -0,0 +1,93 @@ +package etcd + +import ( + "context" + "fmt" + "github.com/bilibili/kratos/pkg/naming" + "github.com/coreos/etcd/clientv3" + "testing" + "time" +) + +func TestNew(t *testing.T) { + + config := &clientv3.Config{ + Endpoints:[]string{"127.0.0.1:2379",}, + } + + builder := New(config) + app1 := builder.Build("app1") + + go func() { + fmt.Printf("Watch \n") + for{ + select{ + case <- app1.Watch(): + fmt.Printf("app1 节点发生变化 \n") + } + + } + + }() + + + time.Sleep(time.Second) + + app1Cancel,err := builder.Register(context.Background(),&naming.Instance{ + AppID:"app1", + Hostname:"h1", + Zone:"z1", + }) + + app2Cancel,err := builder.Register(context.Background(),&naming.Instance{ + AppID:"app2", + Hostname:"h5", + Zone:"z3", + }) + + if(err != nil){ + fmt.Println(err) + } + + app2 := builder.Build("app2") + + go func(){ + fmt.Println("节点列表") + for { + fmt.Printf("app1: ") + r1,_ := app1.Fetch(context.Background()) + if(r1 != nil){ + for z,ins := range r1.Instances{ + fmt.Printf("zone: %s :",z) + for _,in := range ins{ + fmt.Printf("app: %s host %s \n",in.AppID,in.Hostname) + } + } + } + fmt.Printf("app2: ") + r2,_ := app2.Fetch(context.Background()) + if(r2 != nil){ + for z,ins := range r2.Instances{ + fmt.Printf("zone: %s :",z) + for _,in := range ins{ + fmt.Printf("app: %s host %s \n",in.AppID,in.Hostname) + } + } + } + time.Sleep(time.Second) + } + }() + + time.Sleep(time.Second*5) + fmt.Println("取消app1") + app1Cancel() + + time.Sleep(time.Second*10) + fmt.Println("取消app2") + app2Cancel() + + + + wait := make(chan int,0) + wait <- 1 +} From a1ae2ee9676168cc21161e76deeb0bcc99e31788 Mon Sep 17 00:00:00 2001 From: "libi.chai" Date: Fri, 19 Jul 2019 09:54:28 +0800 Subject: [PATCH 2/5] rm test useless code --- pkg/naming/etcd/etcd_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/naming/etcd/etcd_test.go b/pkg/naming/etcd/etcd_test.go index 07c42caa1..28366cc0c 100644 --- a/pkg/naming/etcd/etcd_test.go +++ b/pkg/naming/etcd/etcd_test.go @@ -53,9 +53,7 @@ func TestNew(t *testing.T) { fmt.Println(err) } - fmt.Println(1111) app2 := builder.Build("app2") - fmt.Println(1111) go func() { fmt.Println("节点列表") @@ -69,6 +67,8 @@ func TestNew(t *testing.T) { fmt.Printf("app: %s host %s \n", in.AppID, in.Hostname) } } + }else{ + fmt.Println("\n") } fmt.Printf("app2: ") r2, _ := app2.Fetch(context.Background()) @@ -79,12 +79,13 @@ func TestNew(t *testing.T) { fmt.Printf("app: %s host %s \n", in.AppID, in.Hostname) } } + }else{ + fmt.Printf("\n") } time.Sleep(time.Second) } }() - fmt.Println(1111) time.Sleep(time.Second * 5) fmt.Println("取消app1") app1Cancel() @@ -93,6 +94,5 @@ func TestNew(t *testing.T) { fmt.Println("取消app2") app2Cancel() - fmt.Println(1111) time.Sleep(30*time.Second) } From 6287008c0918df33b556f8b46316558e1bf2a2cc Mon Sep 17 00:00:00 2001 From: "libi.chai" Date: Fri, 19 Jul 2019 10:51:18 +0800 Subject: [PATCH 3/5] rm test useless code --- pkg/naming/etcd/etcd_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/naming/etcd/etcd_test.go b/pkg/naming/etcd/etcd_test.go index 28366cc0c..7d8e8f7e4 100644 --- a/pkg/naming/etcd/etcd_test.go +++ b/pkg/naming/etcd/etcd_test.go @@ -68,7 +68,7 @@ func TestNew(t *testing.T) { } } }else{ - fmt.Println("\n") + fmt.Printf("\n") } fmt.Printf("app2: ") r2, _ := app2.Fetch(context.Background()) From 3b81757ce11b5263f6a8261eef440b476ca3d827 Mon Sep 17 00:00:00 2001 From: "libi.chai" Date: Fri, 19 Jul 2019 21:48:22 +0800 Subject: [PATCH 4/5] add endpoints,prefix env;gofmt --- pkg/naming/etcd/etcd.go | 56 ++++++++++++++++++++++++++++-------- pkg/naming/etcd/etcd_test.go | 14 ++++----- 2 files changed, 51 insertions(+), 19 deletions(-) diff --git a/pkg/naming/etcd/etcd.go b/pkg/naming/etcd/etcd.go index 044f5fc33..f64f107f4 100644 --- a/pkg/naming/etcd/etcd.go +++ b/pkg/naming/etcd/etcd.go @@ -4,7 +4,10 @@ import ( "context" "encoding/json" "errors" + "flag" "fmt" + "os" + "strings" "sync" "sync/atomic" "time" @@ -15,9 +18,10 @@ import ( "github.com/coreos/etcd/mvcc/mvccpb" ) -const ( +var ( //Prefix is a etcd globe key prefix - Prefix = "kratos_etcd3" + Endpoints string + Prefix string RegisterTTL = 30 ) @@ -29,10 +33,28 @@ var ( ErrDuplication = errors.New("etcd: instance duplicate registration") ) +func init() { + addFlag(flag.CommandLine) +} + +func addFlag(fs *flag.FlagSet) { + // env + fs.StringVar(&Endpoints, "etcd.endpoints", os.Getenv("ETCD_ENDPOINTS"), "etcd.endpoints is etcd endpoints. value: 127.0.0.1:2379,127.0.0.2:2379 etc.") + fs.StringVar(&Prefix, "etcd.prefix", defaultString("ETCD_PREFIX", "kratos_etcd"), "etcd globe key prefix or use ETCD_PREFIX env variable. value etcd_prefix etc.") +} + +func defaultString(env, value string) string { + v := os.Getenv(env) + if v == "" { + return value + } + return v +} + // Builder return default etcd resolver builder. func Builder(c *clientv3.Config) naming.Builder { _once.Do(func() { - _builder,_ = New(c) + _builder, _ = New(c) }) return _builder } @@ -42,7 +64,7 @@ func Build(c *clientv3.Config, id string) naming.Resolver { return Builder(c).Build(id) } -//EtcdBuilder is a etcd clientv3 EtcdBuilder +// EtcdBuilder is a etcd clientv3 EtcdBuilder type EtcdBuilder struct { cli *clientv3.Client ctx context.Context @@ -65,11 +87,21 @@ type Resolve struct { event chan struct{} e *EtcdBuilder } -//New is new a etcdbuilder -func New(c *clientv3.Config) (e *EtcdBuilder,err error) { + +// New is new a etcdbuilder +func New(c *clientv3.Config) (e *EtcdBuilder, err error) { + if c == nil { + if Endpoints == "" { + panic(fmt.Errorf("invalid etcd config endpoints:%+v", Endpoints)) + } + c = &clientv3.Config{ + Endpoints: strings.Split(Endpoints, ","), + DialTimeout: time.Second * 30, + } + } cli, err := clientv3.New(*c) if err != nil { - return nil,err + return nil, err } ctx, cancel := context.WithCancel(context.Background()) e = &EtcdBuilder{ @@ -120,7 +152,7 @@ func (e *EtcdBuilder) Scheme() string { } -//Register is register instance +// Register is register instance func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { e.mutex.Lock() if _, ok := e.registry[ins.AppID]; ok { @@ -148,7 +180,7 @@ func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cance go func() { //提前2秒续约 避免续约操作缓慢时租约过期 - ticker := time.NewTicker((RegisterTTL - 2) * time.Second) + ticker := time.NewTicker(time.Duration(RegisterTTL-2) * time.Second) defer ticker.Stop() for { select { @@ -166,7 +198,7 @@ func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cance //注册和续约公用一个操作 func (e *EtcdBuilder) register(ctx context.Context, ins *naming.Instance) (err error) { - prefix := e.getprefix(ins) + prefix := e.keyPrefix(ins) val, _ := json.Marshal(ins) ttlResp, err := e.cli.Grant(context.TODO(), int64(RegisterTTL)) @@ -183,7 +215,7 @@ func (e *EtcdBuilder) register(ctx context.Context, ins *naming.Instance) (err e return nil } func (e *EtcdBuilder) unregister(ins *naming.Instance) (err error) { - prefix := e.getprefix(ins) + prefix := e.keyPrefix(ins) if _, err = e.cli.Delete(context.TODO(), prefix); err != nil { log.Error("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) error(%v)", @@ -194,7 +226,7 @@ func (e *EtcdBuilder) unregister(ins *naming.Instance) (err error) { return } -func (e *EtcdBuilder) getprefix(ins *naming.Instance) string { +func (e *EtcdBuilder) keyPrefix(ins *naming.Instance) string { return fmt.Sprintf("/%s/%s/%s", Prefix, ins.AppID, ins.Hostname) } diff --git a/pkg/naming/etcd/etcd_test.go b/pkg/naming/etcd/etcd_test.go index 7d8e8f7e4..85cbfe69b 100644 --- a/pkg/naming/etcd/etcd_test.go +++ b/pkg/naming/etcd/etcd_test.go @@ -12,12 +12,12 @@ import ( func TestNew(t *testing.T) { config := &clientv3.Config{ - Endpoints: []string{"127.0.0.1:2379"}, - DialTimeout:time.Second*3, + Endpoints: []string{"127.0.0.1:2379"}, + DialTimeout: time.Second * 3, } - builder,err := New(config) + builder, err := New(config) - if(err != nil){ + if err != nil { fmt.Println("etcd 连接失败") return } @@ -67,7 +67,7 @@ func TestNew(t *testing.T) { fmt.Printf("app: %s host %s \n", in.AppID, in.Hostname) } } - }else{ + } else { fmt.Printf("\n") } fmt.Printf("app2: ") @@ -79,7 +79,7 @@ func TestNew(t *testing.T) { fmt.Printf("app: %s host %s \n", in.AppID, in.Hostname) } } - }else{ + } else { fmt.Printf("\n") } time.Sleep(time.Second) @@ -94,5 +94,5 @@ func TestNew(t *testing.T) { fmt.Println("取消app2") app2Cancel() - time.Sleep(30*time.Second) + time.Sleep(30 * time.Second) } From e616bd2898930e7b83afb888db22183adae7f671 Mon Sep 17 00:00:00 2001 From: "libi.chai" Date: Fri, 19 Jul 2019 22:02:25 +0800 Subject: [PATCH 5/5] change Prefix 2 etcdPrefix --- pkg/naming/etcd/etcd.go | 22 +++++++++++----------- pkg/naming/etcd/etcd_test.go | 1 - 2 files changed, 11 insertions(+), 12 deletions(-) diff --git a/pkg/naming/etcd/etcd.go b/pkg/naming/etcd/etcd.go index f64f107f4..338b216eb 100644 --- a/pkg/naming/etcd/etcd.go +++ b/pkg/naming/etcd/etcd.go @@ -19,9 +19,9 @@ import ( ) var ( - //Prefix is a etcd globe key prefix - Endpoints string - Prefix string + //etcdPrefix is a etcd globe key prefix + endpoints string + etcdPrefix string RegisterTTL = 30 ) @@ -39,8 +39,8 @@ func init() { func addFlag(fs *flag.FlagSet) { // env - fs.StringVar(&Endpoints, "etcd.endpoints", os.Getenv("ETCD_ENDPOINTS"), "etcd.endpoints is etcd endpoints. value: 127.0.0.1:2379,127.0.0.2:2379 etc.") - fs.StringVar(&Prefix, "etcd.prefix", defaultString("ETCD_PREFIX", "kratos_etcd"), "etcd globe key prefix or use ETCD_PREFIX env variable. value etcd_prefix etc.") + fs.StringVar(&endpoints, "etcd.endpoints", os.Getenv("ETCD_ENDPOINTS"), "etcd.endpoints is etcd endpoints. value: 127.0.0.1:2379,127.0.0.2:2379 etc.") + fs.StringVar(&etcdPrefix, "etcd.prefix", defaultString("ETCD_PREFIX", "kratos_etcd"), "etcd globe key prefix or use ETCD_PREFIX env variable. value etcd_prefix etc.") } func defaultString(env, value string) string { @@ -91,11 +91,11 @@ type Resolve struct { // New is new a etcdbuilder func New(c *clientv3.Config) (e *EtcdBuilder, err error) { if c == nil { - if Endpoints == "" { - panic(fmt.Errorf("invalid etcd config endpoints:%+v", Endpoints)) + if endpoints == "" { + panic(fmt.Errorf("invalid etcd config endpoints:%+v", endpoints)) } c = &clientv3.Config{ - Endpoints: strings.Split(Endpoints, ","), + Endpoints: strings.Split(endpoints, ","), DialTimeout: time.Second * 30, } } @@ -227,7 +227,7 @@ func (e *EtcdBuilder) unregister(ins *naming.Instance) (err error) { } func (e *EtcdBuilder) keyPrefix(ins *naming.Instance) string { - return fmt.Sprintf("/%s/%s/%s", Prefix, ins.AppID, ins.Hostname) + return fmt.Sprintf("/%s/%s/%s", etcdPrefix, ins.AppID, ins.Hostname) } // Close stop all running process including etcdfetch and register @@ -237,7 +237,7 @@ func (e *EtcdBuilder) Close() error { } func (a *appInfo) watch(appID string) { _ = a.fetchstore(appID) - prefix := fmt.Sprintf("/%s/%s/", Prefix, appID) + prefix := fmt.Sprintf("/%s/%s/", etcdPrefix, appID) rch := a.e.cli.Watch(a.e.ctx, prefix, clientv3.WithPrefix()) for wresp := range rch { for _, ev := range wresp.Events { @@ -249,7 +249,7 @@ func (a *appInfo) watch(appID string) { } func (a *appInfo) fetchstore(appID string) (err error) { - prefix := fmt.Sprintf("/%s/%s/", Prefix, appID) + prefix := fmt.Sprintf("/%s/%s/", etcdPrefix, appID) resp, err := a.e.cli.Get(a.e.ctx, prefix, clientv3.WithPrefix()) if err != nil { log.Error("etcd: fetch client.Get(%s) error(%+v)", prefix, err) diff --git a/pkg/naming/etcd/etcd_test.go b/pkg/naming/etcd/etcd_test.go index 85cbfe69b..445958725 100644 --- a/pkg/naming/etcd/etcd_test.go +++ b/pkg/naming/etcd/etcd_test.go @@ -42,7 +42,6 @@ func TestNew(t *testing.T) { Zone: "z1", }) - fmt.Println(2222) app2Cancel, err := builder.Register(context.Background(), &naming.Instance{ AppID: "app2", Hostname: "h5",