Revert/root ctx v2 (#1088)

* Revert "remove app info (#1081)"

This reverts commit 1dab58616b.

* add http base ctx

Co-authored-by: longXboy <longxboyhi@gmail.com>
pull/1090/head
Tony Chen 3 years ago committed by GitHub
parent b4dd478bc9
commit 59f54b2661
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 45
      app.go
  2. 3
      examples/metadata/server/main.go
  3. 115
      internal/context/context.go
  4. 6
      transport/grpc/server.go
  5. 2
      transport/http/client.go
  6. 3
      transport/http/server.go

@ -16,13 +16,21 @@ import (
"golang.org/x/sync/errgroup"
)
// App is an application components lifecycle manager
// AppInfo is application context value.
type AppInfo interface {
ID() string
Name() string
Version() string
Metadata() map[string]string
Endpoint() []string
}
// App is an application components lifecycle manager.
type App struct {
opts options
ctx context.Context
cancel func()
instance *registry.ServiceInstance
log *log.Helper
}
// New create an application lifecycle manager.
@ -43,17 +51,32 @@ func New(opts ...Option) *App {
ctx: ctx,
cancel: cancel,
opts: options,
log: log.NewHelper(options.logger),
}
}
// ID returns app instance id.
func (a *App) ID() string { return a.opts.id }
// Name returns service name.
func (a *App) Name() string { return a.opts.name }
// Version returns app version.
func (a *App) Version() string { return a.opts.version }
// Metadata returns service metadata.
func (a *App) Metadata() map[string]string { return a.opts.metadata }
// Endpoint returns endpoints.
func (a *App) Endpoint() []string { return a.instance.Endpoints }
// Run executes all OnStart hooks registered with the application's Lifecycle.
func (a *App) Run() error {
instance, err := a.buildInstance()
if err != nil {
return err
}
eg, ctx := errgroup.WithContext(a.ctx)
ctx := NewContext(a.ctx, a)
eg, ctx := errgroup.WithContext(ctx)
wg := sync.WaitGroup{}
for _, srv := range a.opts.servers {
srv := srv
@ -121,7 +144,6 @@ func (a *App) buildInstance() (*registry.ServiceInstance, error) {
}
}
}
return &registry.ServiceInstance{
ID: a.opts.id,
Name: a.opts.name,
@ -130,3 +152,16 @@ func (a *App) buildInstance() (*registry.ServiceInstance, error) {
Endpoints: endpoints,
}, nil
}
type appKey struct{}
// NewContext returns a new Context that carries value.
func NewContext(ctx context.Context, s AppInfo) context.Context {
return context.WithValue(ctx, appKey{}, s)
}
// FromContext returns the Transport value stored in ctx, if any.
func FromContext(ctx context.Context) (s AppInfo, ok bool) {
s, ok = ctx.Value(appKey{}).(AppInfo)
return
}

@ -32,7 +32,8 @@ func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*he
if md, ok := metadata.FromServerContext(ctx); ok {
extra = md.Get("x-md-global-extra")
}
return &helloworld.HelloReply{Message: fmt.Sprintf("Hello %s and %s", in.Name, extra)}, nil
info, _ := kratos.FromContext(ctx)
return &helloworld.HelloReply{Message: fmt.Sprintf("Hello %s extra: %s name: %s", in.Name, extra, info.Name())}, nil
}
func main() {

@ -0,0 +1,115 @@
package context
import (
"context"
"sync"
"sync/atomic"
"time"
)
type mergeCtx struct {
parent1, parent2 context.Context
done chan struct{}
doneMark uint32
doneOnce sync.Once
doneErr error
cancelCh chan struct{}
cancelOnce sync.Once
}
// Merge merges two contexts into one.
func Merge(parent1, parent2 context.Context) (context.Context, context.CancelFunc) {
mc := &mergeCtx{
parent1: parent1,
parent2: parent2,
done: make(chan struct{}),
cancelCh: make(chan struct{}),
}
select {
case <-parent1.Done():
mc.finish(parent1.Err())
case <-parent2.Done():
mc.finish(parent2.Err())
default:
go mc.wait()
}
return mc, mc.cancel
}
func (mc *mergeCtx) finish(err error) error {
mc.doneOnce.Do(func() {
mc.doneErr = err
atomic.StoreUint32(&mc.doneMark, 1)
close(mc.done)
})
return mc.doneErr
}
func (mc *mergeCtx) wait() {
var err error
select {
case <-mc.parent1.Done():
err = mc.parent1.Err()
case <-mc.parent2.Done():
err = mc.parent2.Err()
case <-mc.cancelCh:
err = context.Canceled
}
mc.finish(err)
}
func (mc *mergeCtx) cancel() {
mc.cancelOnce.Do(func() {
close(mc.cancelCh)
})
}
// Done implements context.Context.
func (mc *mergeCtx) Done() <-chan struct{} {
return mc.done
}
// Err implements context.Context.
func (mc *mergeCtx) Err() error {
if atomic.LoadUint32(&mc.doneMark) != 0 {
return mc.doneErr
}
var err error
select {
case <-mc.parent1.Done():
err = mc.parent1.Err()
case <-mc.parent2.Done():
err = mc.parent2.Err()
case <-mc.cancelCh:
err = context.Canceled
default:
return nil
}
return mc.finish(err)
}
// Deadline implements context.Context.
func (mc *mergeCtx) Deadline() (time.Time, bool) {
d1, ok1 := mc.parent1.Deadline()
d2, ok2 := mc.parent2.Deadline()
switch {
case !ok1:
return d2, ok2
case !ok2:
return d1, ok1
case d1.Before(d2):
return d1, true
default:
return d2, true
}
}
// Value implements context.Context.
func (mc *mergeCtx) Value(key interface{}) interface{} {
if v := mc.parent1.Value(key); v != nil {
return v
}
return mc.parent2.Value(key)
}

@ -8,6 +8,7 @@ import (
"time"
apimd "github.com/go-kratos/kratos/v2/api/metadata"
ic "github.com/go-kratos/kratos/v2/internal/context"
"github.com/go-kratos/kratos/v2/internal/host"
"github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/middleware"
@ -78,6 +79,7 @@ func Options(opts ...grpc.ServerOption) ServerOption {
// Server is a gRPC server wrapper.
type Server struct {
*grpc.Server
ctx context.Context
lis net.Listener
once sync.Once
err error
@ -156,6 +158,7 @@ func (s *Server) Start(ctx context.Context) error {
if _, err := s.Endpoint(); err != nil {
return err
}
s.ctx = ctx
s.log.Infof("[gRPC] server listening on: %s", s.lis.Addr().String())
s.health.Resume()
return s.Serve(s.lis)
@ -171,7 +174,7 @@ func (s *Server) Stop(ctx context.Context) error {
func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) {
ctx, cancel := context.WithCancel(ctx)
ctx, cancel := ic.Merge(ctx, s.ctx)
defer cancel()
md, _ := grpcmd.FromIncomingContext(ctx)
ctx = transport.NewServerContext(ctx, &Transport{
@ -180,7 +183,6 @@ func (s *Server) unaryServerInterceptor() grpc.UnaryServerInterceptor {
header: headerCarrier(md),
})
if s.timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, s.timeout)
defer cancel()
}

@ -222,9 +222,9 @@ func (client *Client) invoke(ctx context.Context, req *http.Request, args interf
if err != nil {
return nil, errors.ServiceUnavailable("NODE_NOT_FOUND", err.Error())
}
req = req.Clone(ctx)
req.URL.Scheme = scheme
req.URL.Host = addr
req.Host = addr
}
res, err := client.do(ctx, req, c)
if done != nil {

@ -214,6 +214,9 @@ func (s *Server) Start(ctx context.Context) error {
if _, err := s.Endpoint(); err != nil {
return err
}
s.BaseContext = func(net.Listener) context.Context {
return ctx
}
s.log.Infof("[HTTP] server listening on: %s", s.lis.Addr().String())
if err := s.Serve(s.lis); !errors.Is(err, http.ErrServerClosed) {
return err

Loading…
Cancel
Save