You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kratos/contrib/registry/zookeeper/register.go

153 lines
3.5 KiB

package zookeeper
import (
"context"
"path"
"github.com/go-zookeeper/zk"
"golang.org/x/sync/singleflight"
"github.com/go-kratos/kratos/v2/registry"
)
var (
_ registry.Registrar = &Registry{}
_ registry.Discovery = &Registry{}
)
// Option is etcd registry option.
type Option func(o *options)
type options struct {
namespace string
user string
password string
}
// WithRootPath with registry root path.
func WithRootPath(path string) Option {
return func(o *options) { o.namespace = path }
}
// WithDigestACL with registry password.
func WithDigestACL(user string, password string) Option {
return func(o *options) {
o.user = user
o.password = password
}
}
// Registry is consul registry
type Registry struct {
opts *options
conn *zk.Conn
group singleflight.Group
}
func New(conn *zk.Conn, opts ...Option) *Registry {
options := &options{
namespace: "/microservices",
}
for _, o := range opts {
o(options)
}
return &Registry{
opts: options,
conn: conn,
}
}
func (r *Registry) Register(ctx context.Context, service *registry.ServiceInstance) error {
var (
data []byte
err error
)
if err = r.ensureName(r.opts.namespace, []byte(""), 0); err != nil {
return err
}
serviceNamePath := path.Join(r.opts.namespace, service.Name)
if err = r.ensureName(serviceNamePath, []byte(""), 0); err != nil {
return err
}
if data, err = marshal(service); err != nil {
return err
}
servicePath := path.Join(serviceNamePath, service.ID)
if err = r.ensureName(servicePath, data, zk.FlagEphemeral); err != nil {
return err
}
return nil
}
// Deregister registry service to zookeeper.
func (r *Registry) Deregister(ctx context.Context, service *registry.ServiceInstance) error {
ch := make(chan error, 1)
servicePath := path.Join(r.opts.namespace, service.Name, service.ID)
go func() {
err := r.conn.Delete(servicePath, -1)
ch <- err
}()
var err error
select {
case <-ctx.Done():
err = ctx.Err()
case err = <-ch:
}
return err
}
// GetService get services from zookeeper
func (r *Registry) GetService(ctx context.Context, serviceName string) ([]*registry.ServiceInstance, error) {
instances, err, _ := r.group.Do(serviceName, func() (interface{}, error) {
serviceNamePath := path.Join(r.opts.namespace, serviceName)
servicesID, _, err := r.conn.Children(serviceNamePath)
if err != nil {
return nil, err
}
items := make([]*registry.ServiceInstance, 0, len(servicesID))
for _, service := range servicesID {
servicePath := path.Join(serviceNamePath, service)
serviceInstanceByte, _, err := r.conn.Get(servicePath)
if err != nil {
return nil, err
}
item, err := unmarshal(serviceInstanceByte)
if err != nil {
return nil, err
}
items = append(items, item)
}
return items, nil
})
if err != nil {
return nil, err
}
return instances.([]*registry.ServiceInstance), nil
}
func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watcher, error) {
prefix := path.Join(r.opts.namespace, serviceName)
return newWatcher(ctx, prefix, serviceName, r.conn)
}
// ensureName ensure node exists, if not exist, create and set data
func (r *Registry) ensureName(path string, data []byte, flags int32) error {
exists, _, err := r.conn.Exists(path)
if err != nil {
return err
}
if !exists {
var err error
if len(r.opts.user) > 0 && len(r.opts.password) > 0 {
_, err = r.conn.Create(path, data, flags, zk.DigestACL(zk.PermAll, r.opts.user, r.opts.password))
} else {
_, err = r.conn.Create(path, data, flags, zk.WorldACL(zk.PermAll))
}
if err != nil {
return err
}
}
return nil
}