fix data race (#963)

* fix data race
pull/964/head
Tony Chen 4 years ago committed by GitHub
parent 9b31e6293b
commit 41ea1fbc76
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      .github/workflows/go.yml
  2. 69
      app.go
  3. 23
      transport/grpc/server.go
  4. 21
      transport/grpc/server_test.go
  5. 23
      transport/http/server.go
  6. 18
      transport/http/server_test.go
  7. 6
      transport/transport.go

@ -19,10 +19,10 @@ jobs:
go-version: 1.16
- name: Build
run: go build -v ./...
run: go build ./...
- name: Test
run: go test -v ./...
run: go test -race ./...
- name: Kratos
run: |

@ -5,12 +5,12 @@ import (
"errors"
"os"
"os/signal"
"strings"
"sync"
"syscall"
"time"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport"
"github.com/google/uuid"
"golang.org/x/sync/errgroup"
@ -40,11 +40,10 @@ func New(opts ...Option) *App {
}
ctx, cancel := context.WithCancel(options.ctx)
return &App{
opts: options,
ctx: ctx,
cancel: cancel,
instance: buildInstance(options),
log: log.NewHelper(options.logger),
ctx: ctx,
cancel: cancel,
opts: options,
log: log.NewHelper(options.logger),
}
}
@ -55,28 +54,34 @@ func (a *App) Run() error {
"service_name", a.opts.name,
"version", a.opts.version,
)
g, ctx := errgroup.WithContext(a.ctx)
instance, err := buildInstance(a.opts)
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(a.ctx)
wg := sync.WaitGroup{}
for _, srv := range a.opts.servers {
srv := srv
g.Go(func() error {
eg.Go(func() error {
<-ctx.Done() // wait for stop signal
return srv.Stop()
})
g.Go(func() error {
wg.Add(1)
eg.Go(func() error {
wg.Done()
return srv.Start()
})
}
if err := a.waitForReady(ctx); err != nil {
return err
}
wg.Wait()
if a.opts.registrar != nil {
if err := a.opts.registrar.Register(a.opts.ctx, a.instance); err != nil {
if err := a.opts.registrar.Register(a.opts.ctx, instance); err != nil {
return err
}
a.instance = instance
}
c := make(chan os.Signal, 1)
signal.Notify(c, a.opts.sigs...)
g.Go(func() error {
eg.Go(func() error {
for {
select {
case <-ctx.Done():
@ -86,7 +91,7 @@ func (a *App) Run() error {
}
}
})
if err := g.Wait(); err != nil && !errors.Is(err, context.Canceled) {
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
return err
}
return nil
@ -94,7 +99,7 @@ func (a *App) Run() error {
// Stop gracefully stops the application.
func (a *App) Stop() error {
if a.opts.registrar != nil {
if a.opts.registrar != nil && a.instance != nil {
if err := a.opts.registrar.Deregister(a.opts.ctx, a.instance); err != nil {
return err
}
@ -105,30 +110,14 @@ func (a *App) Stop() error {
return nil
}
func (a *App) waitForReady(ctx context.Context) error {
retry:
for _, srv := range a.opts.servers {
if ctx.Err() != nil {
return ctx.Err()
}
e, err := srv.Endpoint()
if err != nil {
return err
}
if strings.HasSuffix(e, ":0") {
time.Sleep(time.Millisecond * 100)
goto retry
}
}
a.instance = buildInstance(a.opts)
time.Sleep(time.Second)
return nil
}
func buildInstance(o options) *registry.ServiceInstance {
func buildInstance(o options) (*registry.ServiceInstance, error) {
if len(o.endpoints) == 0 {
for _, srv := range o.servers {
if e, err := srv.Endpoint(); err == nil {
if r, ok := srv.(transport.Registry); ok {
e, err := r.Endpoint()
if err != nil {
return nil, err
}
o.endpoints = append(o.endpoints, e)
}
}
@ -139,5 +128,5 @@ func buildInstance(o options) *registry.ServiceInstance {
Version: o.version,
Metadata: o.metadata,
Endpoints: o.endpoints,
}
}, nil
}

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"net"
"strings"
"time"
"github.com/go-kratos/kratos/v2/api/metadata"
@ -19,6 +20,7 @@ import (
)
var _ transport.Server = (*Server)(nil)
var _ transport.Registry = (*Server)(nil)
// ServerOption is gRPC server option.
type ServerOption func(o *Server)
@ -115,6 +117,13 @@ func NewServer(opts ...ServerOption) *Server {
// examples:
// grpc://127.0.0.1:9000?isSecure=false
func (s *Server) Endpoint() (string, error) {
if strings.HasSuffix(s.address, ":0") {
lis, err := net.Listen(s.network, s.address)
if err != nil {
return "", err
}
s.lis = lis
}
addr, err := host.Extract(s.address, s.lis)
if err != nil {
return "", err
@ -124,14 +133,16 @@ func (s *Server) Endpoint() (string, error) {
// Start start the gRPC server.
func (s *Server) Start() error {
lis, err := net.Listen(s.network, s.address)
if err != nil {
return err
if s.lis == nil {
lis, err := net.Listen(s.network, s.address)
if err != nil {
return err
}
s.lis = lis
}
s.lis = lis
s.log.Infof("[gRPC] server listening on: %s", lis.Addr().String())
s.log.Infof("[gRPC] server listening on: %s", s.lis.Addr().String())
s.health.Resume()
return s.Serve(lis)
return s.Serve(s.lis)
}
// Stop stop the gRPC server.

@ -11,18 +11,19 @@ import (
func TestServer(t *testing.T) {
srv := NewServer()
if endpoint, err := srv.Endpoint(); err != nil || endpoint == "" {
t.Fatal(endpoint, err)
if e, err := srv.Endpoint(); err != nil || e == "" {
t.Fatal(e, err)
}
time.AfterFunc(time.Second, func() {
defer srv.Stop()
testClient(t, srv)
})
// start server
if err := srv.Start(); err != nil {
t.Fatal(err)
}
go func() {
// start server
if err := srv.Start(); err != nil {
panic(err)
}
}()
time.Sleep(time.Second)
testClient(t, srv)
srv.Stop()
}
func testClient(t *testing.T, srv *Server) {

@ -6,6 +6,7 @@ import (
"fmt"
"net"
"net/http"
"strings"
"time"
"github.com/go-kratos/kratos/v2/internal/host"
@ -16,6 +17,7 @@ import (
)
var _ transport.Server = (*Server)(nil)
var _ transport.Registry = (*Server)(nil)
// ServerOption is HTTP server option.
type ServerOption func(*Server)
@ -103,6 +105,13 @@ func (s *Server) ServeHTTP(res http.ResponseWriter, req *http.Request) {
// examples:
// http://127.0.0.1:8000?isSecure=false
func (s *Server) Endpoint() (string, error) {
if strings.HasSuffix(s.address, ":0") {
lis, err := net.Listen(s.network, s.address)
if err != nil {
return "", err
}
s.lis = lis
}
addr, err := host.Extract(s.address, s.lis)
if err != nil {
return "", err
@ -112,13 +121,15 @@ func (s *Server) Endpoint() (string, error) {
// Start start the HTTP server.
func (s *Server) Start() error {
lis, err := net.Listen(s.network, s.address)
if err != nil {
return err
if s.lis == nil {
lis, err := net.Listen(s.network, s.address)
if err != nil {
return err
}
s.lis = lis
}
s.lis = lis
s.log.Infof("[HTTP] server listening on: %s", lis.Addr().String())
if err := s.Serve(lis); !errors.Is(err, http.ErrServerClosed) {
s.log.Infof("[HTTP] server listening on: %s", s.lis.Addr().String())
if err := s.Serve(s.lis); !errors.Is(err, http.ErrServerClosed) {
return err
}
return nil

@ -24,14 +24,18 @@ func TestServer(t *testing.T) {
srv := NewServer()
srv.HandleFunc("/index", fn)
time.AfterFunc(time.Second, func() {
defer srv.Stop()
testClient(t, srv)
})
if err := srv.Start(); err != nil {
t.Fatal(err)
if e, err := srv.Endpoint(); err != nil || e == "" {
t.Fatal(e, err)
}
go func() {
if err := srv.Start(); err != nil {
panic(err)
}
}()
time.Sleep(time.Second)
testClient(t, srv)
srv.Stop()
}
func testClient(t *testing.T, srv *Server) {

@ -10,9 +10,13 @@ import (
_ "github.com/go-kratos/kratos/v2/encoding/yaml"
)
// Registry is registry endpoint.
type Registry interface {
Endpoint() (string, error)
}
// Server is transport server.
type Server interface {
Endpoint() (string, error)
Start() error
Stop() error
}

Loading…
Cancel
Save