|
|
|
@ -135,12 +135,20 @@ func (r *Registry) Watch(ctx context.Context, serviceName string) (registry.Watc |
|
|
|
|
|
|
|
|
|
// ensureName ensure node exists, if not exist, create and set data
|
|
|
|
|
func (r *Registry) ensureName(path string, data []byte, flags int32) error { |
|
|
|
|
exists, _, err := r.conn.Exists(path) |
|
|
|
|
exists, stat, err := r.conn.Exists(path) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
// ephemeral nodes handling after restart
|
|
|
|
|
// fixes a race condition if the server crashes without using CreateProtectedEphemeralSequential()
|
|
|
|
|
if flags&zk.FlagEphemeral == zk.FlagEphemeral { |
|
|
|
|
err = r.conn.Delete(path, stat.Version) |
|
|
|
|
if err != nil && err != zk.ErrNoNode { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
exists = false |
|
|
|
|
} |
|
|
|
|
if !exists { |
|
|
|
|
var err error |
|
|
|
|
if len(r.opts.user) > 0 && len(r.opts.password) > 0 { |
|
|
|
|
_, err = r.conn.Create(path, data, flags, zk.DigestACL(zk.PermAll, r.opts.user, r.opts.password)) |
|
|
|
|
} else { |
|
|
|
|