fix(registry/discovery): watch quit while context cancel; supplement metadata (#1524)

* fix(registry/discovery): watch quit with context cancel; lose metadata while translating ServiceInstance.
pull/1533/head
Yeqllo 3 years ago committed by GitHub
parent 74e630c21a
commit a2f53128cf
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 22
      contrib/registry/discovery/discovery_helper.go
  2. 9
      contrib/registry/discovery/impl_discover.go

@ -112,17 +112,25 @@ func toServiceInstance(ins *discoveryInstance) *registry.ServiceInstance {
return nil return nil
} }
return &registry.ServiceInstance{ md := map[string]string{
ID: ins.Metadata[_reservedInstanceIDKey],
Name: ins.AppID,
Version: ins.Version,
Metadata: map[string]string{
"region": ins.Region, "region": ins.Region,
"zone": ins.Region, "zone": ins.Zone,
"lastTs": strconv.Itoa(int(ins.LastTs)), "lastTs": strconv.Itoa(int(ins.LastTs)),
"env": ins.Env, "env": ins.Env,
"hostname": ins.Hostname, "hostname": ins.Hostname,
}, }
if len(ins.Metadata) != 0 {
for k, v := range ins.Metadata {
md[k] = v
}
}
return &registry.ServiceInstance{
ID: ins.Metadata[_reservedInstanceIDKey],
Name: ins.AppID,
Version: ins.Version,
Metadata: md,
Endpoints: ins.Addrs, Endpoints: ins.Addrs,
} }
} }

@ -46,19 +46,26 @@ func (d *Discovery) Watch(ctx context.Context, serviceName string) (registry.Wat
return &watcher{ return &watcher{
Resolve: d.resolveBuild(serviceName), Resolve: d.resolveBuild(serviceName),
serviceName: serviceName, serviceName: serviceName,
cancelCtx: ctx,
}, nil }, nil
} }
type watcher struct { type watcher struct {
*Resolve *Resolve
cancelCtx context.Context
serviceName string serviceName string
} }
func (w *watcher) Next() ([]*registry.ServiceInstance, error) { func (w *watcher) Next() ([]*registry.ServiceInstance, error) {
event := w.Resolve.Watch() event := w.Resolve.Watch()
select {
case <-event:
// change event come // change event come
<-event case <-w.cancelCtx.Done():
return nil, fmt.Errorf("watch context cancelled: %v", w.cancelCtx.Err())
}
ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second) ctx, cancel := context.WithTimeout(context.TODO(), 15*time.Second)
defer cancel() defer cancel()

Loading…
Cancel
Save