parent
4d47b09111
commit
272f26a26b
@ -0,0 +1,284 @@ |
||||
package etcd |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"errors" |
||||
"fmt" |
||||
"time" |
||||
"sync" |
||||
"sync/atomic" |
||||
|
||||
"github.com/bilibili/kratos/pkg/log" |
||||
"github.com/bilibili/kratos/pkg/naming" |
||||
"github.com/coreos/etcd/clientv3" |
||||
"github.com/coreos/etcd/mvcc/mvccpb" |
||||
) |
||||
const ( |
||||
Prefix = "kratos_etcd3" |
||||
|
||||
RegisterTTL = 30 |
||||
) |
||||
var ( |
||||
_once sync.Once |
||||
_builder naming.Builder |
||||
|
||||
ErrDuplication = errors.New("etcd: instance duplicate registration") |
||||
) |
||||
|
||||
// Builder return default etcd resolver builder.
|
||||
func Builder(c *clientv3.Config) naming.Builder { |
||||
_once.Do(func() { |
||||
_builder = New(c) |
||||
}) |
||||
return _builder |
||||
} |
||||
// Build register resolver into default etcd.
|
||||
func Build(c *clientv3.Config,id string) naming.Resolver { |
||||
return Builder(c).Build(id) |
||||
} |
||||
//Etcd is a etcd clientv3 builder
|
||||
type EtcdBuilder struct { |
||||
cli *clientv3.Client |
||||
ctx context.Context |
||||
cancelFunc context.CancelFunc |
||||
|
||||
mutex sync.RWMutex |
||||
apps map[string]*appInfo |
||||
registry map[string]struct{} |
||||
} |
||||
type appInfo struct { |
||||
resolver map[*Resolve]struct{} |
||||
ins atomic.Value |
||||
e *EtcdBuilder |
||||
once sync.Once |
||||
} |
||||
// Resolve etch resolver.
|
||||
type Resolve struct { |
||||
id string |
||||
event chan struct{} |
||||
e *EtcdBuilder |
||||
|
||||
} |
||||
|
||||
func New(c *clientv3.Config) (e *EtcdBuilder){ |
||||
cli,err := clientv3.New(*c) |
||||
if(err != nil){ |
||||
panic(err) |
||||
} |
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
e = &EtcdBuilder{ |
||||
cli: cli, |
||||
ctx: ctx, |
||||
cancelFunc: cancel, |
||||
apps: map[string]*appInfo{}, |
||||
registry: map[string]struct{}{}, |
||||
delete: make(chan *appInfo, 10), |
||||
} |
||||
return |
||||
} |
||||
|
||||
// Build disovery resovler builder.
|
||||
func (e *EtcdBuilder) Build(appid string) naming.Resolver { |
||||
r := &Resolve{ |
||||
id: appid, |
||||
e: e, |
||||
event: make(chan struct{}, 1), |
||||
} |
||||
e.mutex.Lock() |
||||
app, ok := e.apps[appid] |
||||
if !ok { |
||||
app = &appInfo{ |
||||
resolver: make(map[*Resolve]struct{}), |
||||
e:e, |
||||
} |
||||
e.apps[appid] = app |
||||
} |
||||
app.resolver[r] = struct{}{} |
||||
e.mutex.Unlock() |
||||
if ok { |
||||
select { |
||||
case r.event <- struct{}{}: |
||||
default: |
||||
} |
||||
} |
||||
|
||||
app.once.Do(func(){ |
||||
go app.watch(appid) |
||||
log.Info("etcd: AddWatch(%s) already watch(%v)", appid, ok) |
||||
}) |
||||
return r |
||||
} |
||||
|
||||
// Scheme return etcd's scheme
|
||||
func (e *EtcdBuilder) Scheme() string { |
||||
return "etcd" |
||||
|
||||
} |
||||
|
||||
//
|
||||
func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) { |
||||
e.mutex.Lock() |
||||
if _, ok := e.registry[ins.AppID]; ok { |
||||
err = ErrDuplication |
||||
} else { |
||||
e.registry[ins.AppID] = struct{}{} |
||||
} |
||||
e.mutex.Unlock() |
||||
if err != nil { |
||||
return |
||||
} |
||||
|
||||
ctx, cancel := context.WithCancel(e.ctx) |
||||
if err = e.register(ctx, ins); err != nil { |
||||
e.mutex.Lock() |
||||
delete(e.registry, ins.AppID) |
||||
e.mutex.Unlock() |
||||
cancel() |
||||
return |
||||
} |
||||
ch := make(chan struct{}, 1) |
||||
cancelFunc = context.CancelFunc(func() { |
||||
cancel() |
||||
<-ch |
||||
}) |
||||
|
||||
go func() { |
||||
//提前2秒续约 避免续约操作缓慢时租约过期
|
||||
ticker := time.NewTicker((RegisterTTL-2)*time.Second) |
||||
defer ticker.Stop() |
||||
for { |
||||
select { |
||||
case <-ticker.C: |
||||
_ = e.register(ctx, ins) |
||||
case <-ctx.Done(): |
||||
_ = e.unregister(ins) |
||||
ch <- struct{}{} |
||||
return |
||||
} |
||||
} |
||||
}() |
||||
return |
||||
} |
||||
//注册和续约公用一个操作
|
||||
func (e *EtcdBuilder)register(ctx context.Context, ins *naming.Instance) (err error){ |
||||
prefix := e.getprefix(ins) |
||||
val ,_ := json.Marshal(ins) |
||||
|
||||
ttlResp, err := e.cli.Grant(context.TODO(), int64(RegisterTTL)) |
||||
if(err != nil){ |
||||
log.Error("etcd: register client.Grant(%v) error(%v)",RegisterTTL,err) |
||||
return err |
||||
} |
||||
_,err = e.cli.Put(ctx,prefix,string(val),clientv3.WithLease(ttlResp.ID)) |
||||
if(err != nil){ |
||||
log.Error("etcd: register client.Put(%v) appid(%s) hostname(%s) error(%v)", |
||||
prefix,ins.AppID,ins.Hostname, err) |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
func (e *EtcdBuilder)unregister(ins *naming.Instance) (err error){ |
||||
prefix := e.getprefix(ins) |
||||
|
||||
if _,err = e.cli.Delete(context.TODO(),prefix); err != nil{ |
||||
log.Error("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) error(%v)", |
||||
prefix, ins.AppID, ins.Hostname, err) |
||||
} |
||||
log.Info("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) success", |
||||
prefix, ins.AppID, ins.Hostname) |
||||
return |
||||
} |
||||
|
||||
|
||||
func (e *EtcdBuilder)getprefix(ins *naming.Instance) string{ |
||||
return fmt.Sprintf("/%s/%s/%s", Prefix, ins.AppID,ins.Hostname) |
||||
} |
||||
// Close stop all running process including etcdfetch and register
|
||||
func (e *EtcdBuilder) Close() error { |
||||
e.cancelFunc() |
||||
return nil |
||||
} |
||||
func (a *appInfo)watch(appID string){ |
||||
_ = a.fetchstore(appID) |
||||
prefix := fmt.Sprintf("/%s/%s/",Prefix,appID) |
||||
rch := a.e.cli.Watch(a.e.ctx, prefix, clientv3.WithPrefix()) |
||||
for wresp := range rch { |
||||
for _, ev := range wresp.Events { |
||||
if(ev.Type == mvccpb.PUT || ev.Type == mvccpb.DELETE){ |
||||
_ = a.fetchstore(appID) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (a *appInfo)fetchstore(appID string)(err error){ |
||||
prefix := fmt.Sprintf("/%s/%s/", Prefix, appID) |
||||
resp ,err := a.e.cli.Get(a.e.ctx,prefix,clientv3.WithPrefix()) |
||||
if(err != nil){ |
||||
log.Error("etcd: fetch client.Get(%s) error(%+v)", prefix, err) |
||||
return err |
||||
} |
||||
|
||||
ins,err := a.paserIns(resp) |
||||
if(err != nil){ |
||||
return err |
||||
} |
||||
a.store(ins) |
||||
return nil |
||||
} |
||||
func (a *appInfo)store(ins *naming.InstancesInfo){ |
||||
|
||||
a.ins.Store(ins) |
||||
a.e.mutex.RLock() |
||||
for rs := range a.resolver { |
||||
select { |
||||
case rs.event <- struct{}{}: |
||||
default: |
||||
} |
||||
} |
||||
a.e.mutex.RUnlock() |
||||
} |
||||
|
||||
|
||||
func (a *appInfo)paserIns(resp *clientv3.GetResponse)(ins *naming.InstancesInfo,err error){ |
||||
ins = &naming.InstancesInfo{ |
||||
Instances:make(map[string][]*naming.Instance,0), |
||||
} |
||||
for _, ev := range resp.Kvs { |
||||
in := new(naming.Instance) |
||||
|
||||
err := json.Unmarshal(ev.Value,in) |
||||
if(err != nil){ |
||||
return nil,err |
||||
} |
||||
ins.Instances[in.Zone] = append(ins.Instances[in.Zone],in) |
||||
} |
||||
return ins,nil |
||||
} |
||||
// Watch watch instance.
|
||||
func (r *Resolve) Watch() <-chan struct{} { |
||||
return r.event |
||||
} |
||||
|
||||
// Fetch fetch resolver instance.
|
||||
func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) { |
||||
r.e.mutex.RLock() |
||||
app, ok := r.e.apps[r.id] |
||||
r.e.mutex.RUnlock() |
||||
if ok { |
||||
ins, ok = app.ins.Load().(*naming.InstancesInfo) |
||||
return |
||||
} |
||||
return |
||||
} |
||||
|
||||
// Close close resolver.
|
||||
func (r *Resolve) Close() error { |
||||
r.e.mutex.Lock() |
||||
if app, ok := r.e.apps[r.id]; ok && len(app.resolver) != 0 { |
||||
delete(app.resolver, r) |
||||
} |
||||
r.e.mutex.Unlock() |
||||
return nil |
||||
} |
@ -0,0 +1,93 @@ |
||||
package etcd |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"github.com/bilibili/kratos/pkg/naming" |
||||
"github.com/coreos/etcd/clientv3" |
||||
"testing" |
||||
"time" |
||||
) |
||||
|
||||
func TestNew(t *testing.T) { |
||||
|
||||
config := &clientv3.Config{ |
||||
Endpoints:[]string{"127.0.0.1:2379",}, |
||||
} |
||||
|
||||
builder := New(config) |
||||
app1 := builder.Build("app1") |
||||
|
||||
go func() { |
||||
fmt.Printf("Watch \n") |
||||
for{ |
||||
select{ |
||||
case <- app1.Watch(): |
||||
fmt.Printf("app1 节点发生变化 \n") |
||||
} |
||||
|
||||
} |
||||
|
||||
}() |
||||
|
||||
|
||||
time.Sleep(time.Second) |
||||
|
||||
app1Cancel,err := builder.Register(context.Background(),&naming.Instance{ |
||||
AppID:"app1", |
||||
Hostname:"h1", |
||||
Zone:"z1", |
||||
}) |
||||
|
||||
app2Cancel,err := builder.Register(context.Background(),&naming.Instance{ |
||||
AppID:"app2", |
||||
Hostname:"h5", |
||||
Zone:"z3", |
||||
}) |
||||
|
||||
if(err != nil){ |
||||
fmt.Println(err) |
||||
} |
||||
|
||||
app2 := builder.Build("app2") |
||||
|
||||
go func(){ |
||||
fmt.Println("节点列表") |
||||
for { |
||||
fmt.Printf("app1: ") |
||||
r1,_ := app1.Fetch(context.Background()) |
||||
if(r1 != nil){ |
||||
for z,ins := range r1.Instances{ |
||||
fmt.Printf("zone: %s :",z) |
||||
for _,in := range ins{ |
||||
fmt.Printf("app: %s host %s \n",in.AppID,in.Hostname) |
||||
} |
||||
} |
||||
} |
||||
fmt.Printf("app2: ") |
||||
r2,_ := app2.Fetch(context.Background()) |
||||
if(r2 != nil){ |
||||
for z,ins := range r2.Instances{ |
||||
fmt.Printf("zone: %s :",z) |
||||
for _,in := range ins{ |
||||
fmt.Printf("app: %s host %s \n",in.AppID,in.Hostname) |
||||
} |
||||
} |
||||
} |
||||
time.Sleep(time.Second) |
||||
} |
||||
}() |
||||
|
||||
time.Sleep(time.Second*5) |
||||
fmt.Println("取消app1") |
||||
app1Cancel() |
||||
|
||||
time.Sleep(time.Second*10) |
||||
fmt.Println("取消app2") |
||||
app2Cancel() |
||||
|
||||
|
||||
|
||||
wait := make(chan int,0) |
||||
wait <- 1 |
||||
} |
Loading…
Reference in new issue