You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
324 lines
7.2 KiB
324 lines
7.2 KiB
package etcd
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"errors"
|
|
"flag"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"go.etcd.io/etcd/api/v3/mvccpb"
|
|
"go.etcd.io/etcd/client/v3"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/go-kratos/kratos/pkg/log"
|
|
"github.com/go-kratos/kratos/pkg/naming"
|
|
)
|
|
|
|
var (
|
|
//etcdPrefix is a etcd globe key prefix
|
|
endpoints string
|
|
etcdPrefix string
|
|
|
|
//Time units is second
|
|
registerTTL = 90
|
|
defaultDialTimeout = 30
|
|
)
|
|
|
|
var (
|
|
_once sync.Once
|
|
_builder naming.Builder
|
|
//ErrDuplication is a register duplication err
|
|
ErrDuplication = errors.New("etcd: instance duplicate registration")
|
|
)
|
|
|
|
func init() {
|
|
addFlag(flag.CommandLine)
|
|
}
|
|
|
|
func addFlag(fs *flag.FlagSet) {
|
|
// env
|
|
fs.StringVar(&endpoints, "etcd.endpoints", os.Getenv("ETCD_ENDPOINTS"), "etcd.endpoints is etcd endpoints. value: 127.0.0.1:2379,127.0.0.2:2379 etc.")
|
|
fs.StringVar(&etcdPrefix, "etcd.prefix", defaultString("ETCD_PREFIX", "kratos_etcd"), "etcd globe key prefix or use ETCD_PREFIX env variable. value etcd_prefix etc.")
|
|
}
|
|
|
|
func defaultString(env, value string) string {
|
|
v := os.Getenv(env)
|
|
if v == "" {
|
|
return value
|
|
}
|
|
return v
|
|
}
|
|
|
|
// Builder return default etcd resolver builder.
|
|
func Builder(c *clientv3.Config) naming.Builder {
|
|
_once.Do(func() {
|
|
_builder, _ = New(c)
|
|
})
|
|
return _builder
|
|
}
|
|
|
|
// Build register resolver into default etcd.
|
|
func Build(c *clientv3.Config, id string) naming.Resolver {
|
|
return Builder(c).Build(id)
|
|
}
|
|
|
|
// EtcdBuilder is a etcd clientv3 EtcdBuilder
|
|
type EtcdBuilder struct {
|
|
cli *clientv3.Client
|
|
ctx context.Context
|
|
cancelFunc context.CancelFunc
|
|
|
|
mutex sync.RWMutex
|
|
apps map[string]*appInfo
|
|
registry map[string]struct{}
|
|
}
|
|
type appInfo struct {
|
|
resolver map[*Resolve]struct{}
|
|
ins atomic.Value
|
|
e *EtcdBuilder
|
|
once sync.Once
|
|
}
|
|
|
|
// Resolve etch resolver.
|
|
type Resolve struct {
|
|
id string
|
|
event chan struct{}
|
|
e *EtcdBuilder
|
|
opt *naming.BuildOptions
|
|
}
|
|
|
|
// New is new a etcdbuilder
|
|
func New(c *clientv3.Config) (e *EtcdBuilder, err error) {
|
|
if c == nil {
|
|
if endpoints == "" {
|
|
panic(fmt.Errorf("invalid etcd config endpoints:%+v", endpoints))
|
|
}
|
|
c = &clientv3.Config{
|
|
Endpoints: strings.Split(endpoints, ","),
|
|
DialTimeout: time.Second * time.Duration(defaultDialTimeout),
|
|
DialOptions: []grpc.DialOption{grpc.WithBlock()},
|
|
}
|
|
}
|
|
cli, err := clientv3.New(*c)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
e = &EtcdBuilder{
|
|
cli: cli,
|
|
ctx: ctx,
|
|
cancelFunc: cancel,
|
|
apps: map[string]*appInfo{},
|
|
registry: map[string]struct{}{},
|
|
}
|
|
return
|
|
}
|
|
|
|
// Build disovery resovler builder.
|
|
func (e *EtcdBuilder) Build(appid string, opts ...naming.BuildOpt) naming.Resolver {
|
|
r := &Resolve{
|
|
id: appid,
|
|
e: e,
|
|
event: make(chan struct{}, 1),
|
|
opt: new(naming.BuildOptions),
|
|
}
|
|
e.mutex.Lock()
|
|
app, ok := e.apps[appid]
|
|
if !ok {
|
|
app = &appInfo{
|
|
resolver: make(map[*Resolve]struct{}),
|
|
e: e,
|
|
}
|
|
e.apps[appid] = app
|
|
}
|
|
app.resolver[r] = struct{}{}
|
|
e.mutex.Unlock()
|
|
if ok {
|
|
select {
|
|
case r.event <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
|
|
app.once.Do(func() {
|
|
go app.watch(appid)
|
|
log.Info("etcd: AddWatch(%s) already watch(%v)", appid, ok)
|
|
})
|
|
return r
|
|
}
|
|
|
|
// Scheme return etcd's scheme
|
|
func (e *EtcdBuilder) Scheme() string {
|
|
return "etcd"
|
|
}
|
|
|
|
// Register is register instance
|
|
func (e *EtcdBuilder) Register(ctx context.Context, ins *naming.Instance) (cancelFunc context.CancelFunc, err error) {
|
|
e.mutex.Lock()
|
|
if _, ok := e.registry[ins.AppID]; ok {
|
|
err = ErrDuplication
|
|
} else {
|
|
e.registry[ins.AppID] = struct{}{}
|
|
}
|
|
e.mutex.Unlock()
|
|
if err != nil {
|
|
return
|
|
}
|
|
ctx, cancel := context.WithCancel(e.ctx)
|
|
if err = e.register(ctx, ins); err != nil {
|
|
e.mutex.Lock()
|
|
delete(e.registry, ins.AppID)
|
|
e.mutex.Unlock()
|
|
cancel()
|
|
return
|
|
}
|
|
ch := make(chan struct{}, 1)
|
|
cancelFunc = context.CancelFunc(func() {
|
|
cancel()
|
|
<-ch
|
|
})
|
|
|
|
go func() {
|
|
ticker := time.NewTicker(time.Duration(registerTTL/3) * time.Second)
|
|
defer ticker.Stop()
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
_ = e.register(ctx, ins)
|
|
case <-ctx.Done():
|
|
_ = e.unregister(ins)
|
|
ch <- struct{}{}
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
return
|
|
}
|
|
|
|
//注册和续约公用一个操作
|
|
func (e *EtcdBuilder) register(ctx context.Context, ins *naming.Instance) (err error) {
|
|
prefix := e.keyPrefix(ins)
|
|
val, _ := json.Marshal(ins)
|
|
|
|
ttlResp, err := e.cli.Grant(context.TODO(), int64(registerTTL))
|
|
if err != nil {
|
|
log.Error("etcd: register client.Grant(%v) error(%v)", registerTTL, err)
|
|
return err
|
|
}
|
|
_, err = e.cli.Put(ctx, prefix, string(val), clientv3.WithLease(ttlResp.ID))
|
|
if err != nil {
|
|
log.Error("etcd: register client.Put(%v) appid(%s) hostname(%s) error(%v)",
|
|
prefix, ins.AppID, ins.Hostname, err)
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
func (e *EtcdBuilder) unregister(ins *naming.Instance) (err error) {
|
|
prefix := e.keyPrefix(ins)
|
|
|
|
if _, err = e.cli.Delete(context.TODO(), prefix); err != nil {
|
|
log.Error("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) error(%v)",
|
|
prefix, ins.AppID, ins.Hostname, err)
|
|
}
|
|
log.Info("etcd: unregister client.Delete(%v) appid(%s) hostname(%s) success",
|
|
prefix, ins.AppID, ins.Hostname)
|
|
return
|
|
}
|
|
|
|
func (e *EtcdBuilder) keyPrefix(ins *naming.Instance) string {
|
|
return fmt.Sprintf("/%s/%s/%s", etcdPrefix, ins.AppID, ins.Hostname)
|
|
}
|
|
|
|
// Close stop all running process including etcdfetch and register
|
|
func (e *EtcdBuilder) Close() error {
|
|
e.cancelFunc()
|
|
return nil
|
|
}
|
|
func (a *appInfo) watch(appID string) {
|
|
_ = a.fetchstore(appID)
|
|
prefix := fmt.Sprintf("/%s/%s/", etcdPrefix, appID)
|
|
rch := a.e.cli.Watch(a.e.ctx, prefix, clientv3.WithPrefix())
|
|
for wresp := range rch {
|
|
for _, ev := range wresp.Events {
|
|
if ev.Type == mvccpb.PUT || ev.Type == mvccpb.DELETE {
|
|
_ = a.fetchstore(appID)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (a *appInfo) fetchstore(appID string) (err error) {
|
|
prefix := fmt.Sprintf("/%s/%s/", etcdPrefix, appID)
|
|
resp, err := a.e.cli.Get(a.e.ctx, prefix, clientv3.WithPrefix())
|
|
if err != nil {
|
|
log.Error("etcd: fetch client.Get(%s) error(%+v)", prefix, err)
|
|
return err
|
|
}
|
|
|
|
ins, err := a.paserIns(resp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
a.store(ins)
|
|
return nil
|
|
}
|
|
func (a *appInfo) store(ins *naming.InstancesInfo) {
|
|
a.ins.Store(ins)
|
|
a.e.mutex.RLock()
|
|
for rs := range a.resolver {
|
|
select {
|
|
case rs.event <- struct{}{}:
|
|
default:
|
|
}
|
|
}
|
|
a.e.mutex.RUnlock()
|
|
}
|
|
|
|
func (a *appInfo) paserIns(resp *clientv3.GetResponse) (ins *naming.InstancesInfo, err error) {
|
|
ins = &naming.InstancesInfo{
|
|
Instances: make(map[string][]*naming.Instance),
|
|
}
|
|
for _, ev := range resp.Kvs {
|
|
in := new(naming.Instance)
|
|
|
|
err := json.Unmarshal(ev.Value, in)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ins.Instances[in.Zone] = append(ins.Instances[in.Zone], in)
|
|
}
|
|
return ins, nil
|
|
}
|
|
|
|
// Watch watch instance.
|
|
func (r *Resolve) Watch() <-chan struct{} {
|
|
return r.event
|
|
}
|
|
|
|
// Fetch fetch resolver instance.
|
|
func (r *Resolve) Fetch(ctx context.Context) (ins *naming.InstancesInfo, ok bool) {
|
|
r.e.mutex.RLock()
|
|
app, ok := r.e.apps[r.id]
|
|
r.e.mutex.RUnlock()
|
|
if ok {
|
|
ins, ok = app.ins.Load().(*naming.InstancesInfo)
|
|
return
|
|
}
|
|
return
|
|
}
|
|
|
|
// Close close resolver.
|
|
func (r *Resolve) Close() error {
|
|
r.e.mutex.Lock()
|
|
if app, ok := r.e.apps[r.id]; ok && len(app.resolver) != 0 {
|
|
delete(app.resolver, r)
|
|
}
|
|
r.e.mutex.Unlock()
|
|
return nil
|
|
}
|
|
|