fix discovery resolver watch goroutine leak (#750)

* fix discovery resolver watch goroutine leak
pull/752/head
libi 4 years ago committed by GitHub
parent 39886e0a0c
commit 26cf7c80ad
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      cmd/protoc-gen-go-errors/go.sum
  2. 3
      transport/grpc/resolver/discovery/builder.go
  3. 12
      transport/grpc/resolver/discovery/resolver.go
  4. 58
      transport/grpc/resolver/discovery/resolver_test.go

@ -13,6 +13,7 @@ github.com/go-kratos/kratos/v2 v2.0.0-20210217083752-d86d233d93ce h1:LfOsLN9s8tA
github.com/go-kratos/kratos/v2 v2.0.0-20210217083752-d86d233d93ce/go.mod h1:oLvFyDBJkkWN8TPqb+NmpvRrSy9uM/K+XQubVRc11a8=
github.com/go-kratos/kratos/v2 v2.0.0-alpha4 h1:MKkkSZigSMg7Kx8HzrobZ93zlgmi0tAKWM9bMf6YTpU=
github.com/go-kratos/kratos/v2 v2.0.0-alpha4/go.mod h1:oLvFyDBJkkWN8TPqb+NmpvRrSy9uM/K+XQubVRc11a8=
github.com/go-kratos/kratos/v2 v2.0.0-alpha5/go.mod h1:oLvFyDBJkkWN8TPqb+NmpvRrSy9uM/K+XQubVRc11a8=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=

@ -42,10 +42,13 @@ func (d *builder) Build(target resolver.Target, cc resolver.ClientConn, opts res
if err != nil {
return nil, err
}
ctx, cancel := context.WithCancel(context.Background())
r := &discoveryResolver{
w: w,
cc: cc,
log: log.NewHelper("grpc/resolver/discovery", d.logger),
ctx: ctx,
cancel: cancel,
}
go r.watch()
return r, nil

@ -1,6 +1,7 @@
package discovery
import (
"context"
"net/url"
"time"
@ -14,10 +15,20 @@ type discoveryResolver struct {
w registry.Watcher
cc resolver.ClientConn
log *log.Helper
ctx context.Context
cancel context.CancelFunc
}
func (r *discoveryResolver) watch() {
for {
select {
case <-r.ctx.Done():
//goroutine exit
return
default:
}
ins, err := r.w.Next()
if err != nil {
r.log.Errorf("Failed to watch discovery endpoint: %v", err)
@ -47,6 +58,7 @@ func (r *discoveryResolver) update(ins []*registry.ServiceInstance) {
}
func (r *discoveryResolver) Close() {
r.cancel()
r.w.Close()
}

@ -0,0 +1,58 @@
package discovery
import (
"context"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry"
"google.golang.org/grpc/resolver"
"testing"
"time"
)
type testClientConn struct {
resolver.ClientConn // For unimplemented functions
te *testing.T
}
func (t *testClientConn) UpdateState(s resolver.State) {
t.te.Log("UpdateState", s)
}
type testWatch struct {
}
func (m *testWatch) Next() ([]*registry.ServiceInstance, error) {
time.Sleep(time.Millisecond * 200)
ins := []*registry.ServiceInstance{
&registry.ServiceInstance{
ID: "mock_ID",
Name: "mock_Name",
Version: "mock_Version",
},
}
return ins, nil
}
// Watch creates a watcher according to the service name.
func (m *testWatch) Close() error {
return nil
}
func TestWatch(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
r := &discoveryResolver{
w: &testWatch{},
cc: &testClientConn{te: t},
log: log.NewHelper("grpc/resolver/discovery", log.DefaultLogger),
ctx: ctx,
cancel: cancel,
}
go func() {
time.Sleep(time.Second * 2)
r.Close()
}()
r.watch()
t.Log("watch goroutine exited after 2 second")
}
Loading…
Cancel
Save