Compare commits

...

4 Commits

Author SHA1 Message Date
haiyux 156c97181e remove examples 3 years ago
haiyux 5b6bf88a27 merge main 3 years ago
haiyux 49f700124b fix lint 3 years ago
haiyux 2f74c9dfae feat:add grpc health check 3 years ago
  1. 21
      app.go
  2. 8
      app_test.go
  3. 86
      health/health.go
  4. 137
      health/health.pb.go
  5. 12
      health/health.proto
  6. 15
      health/option.go
  7. 99
      transport/grpc/health/health.go
  8. 6
      transport/grpc/server.go

@ -9,6 +9,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/go-kratos/kratos/v2/health"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry" "github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport" "github.com/go-kratos/kratos/v2/transport"
@ -24,6 +25,7 @@ type AppInfo interface {
Version() string Version() string
Metadata() map[string]string Metadata() map[string]string
Endpoint() []string Endpoint() []string
Health() *health.Health
} }
// App is an application components lifecycle manager. // App is an application components lifecycle manager.
@ -33,6 +35,7 @@ type App struct {
cancel func() cancel func()
lk sync.Mutex lk sync.Mutex
instance *registry.ServiceInstance instance *registry.ServiceInstance
health *health.Health
} }
// New create an application lifecycle manager. // New create an application lifecycle manager.
@ -55,6 +58,7 @@ func New(opts ...Option) *App {
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
opts: o, opts: o,
health: health.New(),
} }
} }
@ -78,6 +82,9 @@ func (a *App) Endpoint() []string {
return a.instance.Endpoints return a.instance.Endpoints
} }
// Health returns health
func (a *App) Health() *health.Health { return a.health }
// Run executes all OnStart hooks registered with the application's Lifecycle. // Run executes all OnStart hooks registered with the application's Lifecycle.
func (a *App) Run() error { func (a *App) Run() error {
instance, err := a.buildInstance() instance, err := a.buildInstance()
@ -101,10 +108,14 @@ func (a *App) Run() error {
}) })
} }
wg.Wait() wg.Wait()
err = a.health.SetStatus(a.Name(), health.Status_SERVING)
if err != nil {
log.Errorf("set health status error: %v", err)
}
if a.opts.registrar != nil { if a.opts.registrar != nil {
rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout) rctx, rcancel := context.WithTimeout(ctx, a.opts.registrarTimeout)
defer rcancel() defer rcancel()
if err := a.opts.registrar.Register(rctx, instance); err != nil { if err = a.opts.registrar.Register(rctx, instance); err != nil {
return err return err
} }
a.lk.Lock() a.lk.Lock()
@ -117,6 +128,10 @@ func (a *App) Run() error {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
err = a.health.SetStatus(a.Name(), health.Status_NOT_SERVING)
if err != nil {
log.Errorf("set health status error: %v", err)
}
return ctx.Err() return ctx.Err()
case <-c: case <-c:
if err := a.Stop(); err != nil { if err := a.Stop(); err != nil {
@ -134,6 +149,10 @@ func (a *App) Run() error {
// Stop gracefully stops the application. // Stop gracefully stops the application.
func (a *App) Stop() error { func (a *App) Stop() error {
err := a.health.SetStatus(a.Name(), health.Status_NOT_SERVING)
if err != nil {
log.Errorf("set health status error: %v", err)
}
a.lk.Lock() a.lk.Lock()
instance := a.instance instance := a.instance
a.lk.Unlock() a.lk.Unlock()

@ -9,8 +9,6 @@ import (
"time" "time"
"github.com/go-kratos/kratos/v2/registry" "github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
) )
type mockRegistry struct { type mockRegistry struct {
@ -40,12 +38,12 @@ func (r *mockRegistry) Deregister(ctx context.Context, service *registry.Service
} }
func TestApp(t *testing.T) { func TestApp(t *testing.T) {
hs := http.NewServer() // hs := http.NewServer()
gs := grpc.NewServer() // gs := grpc.NewServer()
app := New( app := New(
Name("kratos"), Name("kratos"),
Version("v1.0.0"), Version("v1.0.0"),
Server(hs, gs), // Server(hs, gs),
Registrar(&mockRegistry{service: make(map[string]*registry.ServiceInstance)}), Registrar(&mockRegistry{service: make(map[string]*registry.ServiceInstance)}),
) )
time.AfterFunc(time.Second, func() { time.AfterFunc(time.Second, func() {

@ -0,0 +1,86 @@
package health
import (
"context"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
type Health struct {
mutex sync.RWMutex
// key: service name, type is string
// value: service Status type is Status
statusMap sync.Map
updates map[string]map[string]chan Status
ticker *time.Ticker
}
func New(opts ...Option) *Health {
option := options{
watchTime: time.Second * 5,
}
for _, o := range opts {
o(&option)
}
h := &Health{
ticker: time.NewTicker(option.watchTime),
updates: map[string]map[string]chan Status{},
}
return h
}
func (h *Health) GetStatus(service string) (Status, bool) {
status, ok := h.statusMap.Load(service)
if !ok {
return Status_UNKNOWN, false
}
return status.(Status), ok
}
func (h *Health) SetStatus(service string, status Status) error {
h.statusMap.Store(service, status)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)
h.mutex.RLock()
for _, w := range h.updates[service] {
ch := w
eg.Go(func() error {
select {
case ch <- status:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
}
h.mutex.RUnlock()
return eg.Wait()
}
func (h *Health) Update(service string, id string) chan Status {
h.mutex.RLock()
defer h.mutex.RUnlock()
if _, ok := h.updates[service]; !ok {
h.updates[service] = make(map[string]chan Status)
}
if _, ok := h.updates[service][id]; !ok {
h.updates[service][id] = make(chan Status, 1)
}
return h.updates[service][id]
}
func (h *Health) DelUpdate(service string, id string) {
h.mutex.Lock()
defer h.mutex.Unlock()
if _, ok := h.updates[service]; ok {
delete(h.updates[service], id)
}
}
func (h *Health) Ticker() *time.Ticker {
return h.ticker
}

@ -0,0 +1,137 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.17.3
// source: health/health.proto
package health
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Status int32
const (
Status_UNKNOWN Status = 0
Status_SERVING Status = 1
Status_NOT_SERVING Status = 2
Status_SERVICE_UNKNOWN Status = 3
)
// Enum value maps for Status.
var (
Status_name = map[int32]string{
0: "UNKNOWN",
1: "SERVING",
2: "NOT_SERVING",
3: "SERVICE_UNKNOWN",
}
Status_value = map[string]int32{
"UNKNOWN": 0,
"SERVING": 1,
"NOT_SERVING": 2,
"SERVICE_UNKNOWN": 3,
}
)
func (x Status) Enum() *Status {
p := new(Status)
*p = x
return p
}
func (x Status) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (Status) Descriptor() protoreflect.EnumDescriptor {
return file_health_health_proto_enumTypes[0].Descriptor()
}
func (Status) Type() protoreflect.EnumType {
return &file_health_health_proto_enumTypes[0]
}
func (x Status) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use Status.Descriptor instead.
func (Status) EnumDescriptor() ([]byte, []int) {
return file_health_health_proto_rawDescGZIP(), []int{0}
}
var File_health_health_proto protoreflect.FileDescriptor
var file_health_health_proto_rawDesc = []byte{
0x0a, 0x13, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2a, 0x48, 0x0a,
0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f,
0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10,
0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e, 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47,
0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45, 0x5f, 0x55, 0x4e,
0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x03, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75,
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2d, 0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f,
0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x3b, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_health_health_proto_rawDescOnce sync.Once
file_health_health_proto_rawDescData = file_health_health_proto_rawDesc
)
func file_health_health_proto_rawDescGZIP() []byte {
file_health_health_proto_rawDescOnce.Do(func() {
file_health_health_proto_rawDescData = protoimpl.X.CompressGZIP(file_health_health_proto_rawDescData)
})
return file_health_health_proto_rawDescData
}
var file_health_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_health_health_proto_goTypes = []interface{}{
(Status)(0), // 0: health.Status
}
var file_health_health_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_health_health_proto_init() }
func file_health_health_proto_init() {
if File_health_health_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_health_health_proto_rawDesc,
NumEnums: 1,
NumMessages: 0,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_health_health_proto_goTypes,
DependencyIndexes: file_health_health_proto_depIdxs,
EnumInfos: file_health_health_proto_enumTypes,
}.Build()
File_health_health_proto = out.File
file_health_health_proto_rawDesc = nil
file_health_health_proto_goTypes = nil
file_health_health_proto_depIdxs = nil
}

@ -0,0 +1,12 @@
syntax = "proto3";
package health;
option go_package = "github.com/go-kratos/kratos/v2/health;health";
enum Status {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}

@ -0,0 +1,15 @@
package health
import "time"
type Option func(*options)
type options struct {
watchTime time.Duration
}
func WithWatchTime(t time.Duration) Option {
return func(o *options) {
o.watchTime = t
}
}

@ -0,0 +1,99 @@
package health
import (
"context"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/health"
"github.com/google/uuid"
pb "google.golang.org/grpc/health/grpc_health_v1"
)
type Server struct {
pb.UnimplementedHealthServer
}
func NewServer() *Server {
return &Server{}
}
func (s *Server) Check(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) {
info, ok := kratos.FromContext(ctx)
if !ok {
return nil, errors.InternalServer("kratos.FromContext(ctx) failed", "no info found in context")
}
status, ok := info.Health().GetStatus(req.Service)
if !ok {
status = health.Status_UNKNOWN
}
var sv pb.HealthCheckResponse_ServingStatus
switch status {
case health.Status_SERVING:
sv = pb.HealthCheckResponse_SERVING
case health.Status_NOT_SERVING:
sv = pb.HealthCheckResponse_NOT_SERVING
case health.Status_SERVICE_UNKNOWN:
sv = pb.HealthCheckResponse_SERVICE_UNKNOWN
default:
sv = pb.HealthCheckResponse_NOT_SERVING
}
return &pb.HealthCheckResponse{
Status: sv,
}, nil
}
func (s *Server) Watch(req *pb.HealthCheckRequest, ss pb.Health_WatchServer) (err error) {
ctx := ss.Context()
info, ok := kratos.FromContext(ctx)
if !ok {
return errors.InternalServer("get info failed", "")
}
uid, err := uuid.NewUUID()
if err != nil {
return errors.InternalServer("new uuid failed", err.Error())
}
update := info.Health().Update(req.Service, uid.String())
defer info.Health().DelUpdate(req.Service, uid.String())
status, ok := info.Health().GetStatus(req.Service)
if !ok {
update <- health.Status_SERVICE_UNKNOWN
} else {
update <- status
}
ticker := info.Health().Ticker()
for {
select {
case <-ctx.Done():
return nil
case status = <-update:
err := send(ss, status)
if err != nil {
return err
}
case <-ticker.C:
err := send(ss, status)
if err != nil {
return err
}
}
}
}
func send(ss pb.Health_WatchServer, status health.Status) error {
var sv pb.HealthCheckResponse_ServingStatus
switch status {
case health.Status_SERVING:
sv = pb.HealthCheckResponse_SERVING
case health.Status_NOT_SERVING:
sv = pb.HealthCheckResponse_NOT_SERVING
case health.Status_SERVICE_UNKNOWN:
sv = pb.HealthCheckResponse_SERVICE_UNKNOWN
default:
sv = pb.HealthCheckResponse_NOT_SERVING
}
reply := &pb.HealthCheckResponse{
Status: sv,
}
return ss.Send(reply)
}

@ -16,9 +16,9 @@ import (
"github.com/go-kratos/kratos/v2/middleware" "github.com/go-kratos/kratos/v2/middleware"
"github.com/go-kratos/kratos/v2/transport" "github.com/go-kratos/kratos/v2/transport"
"github.com/go-kratos/kratos/v2/transport/grpc/health"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
@ -185,14 +185,14 @@ func (s *Server) Start(ctx context.Context) error {
} }
s.baseCtx = ctx s.baseCtx = ctx
s.log.Infof("[gRPC] server listening on: %s", s.lis.Addr().String()) s.log.Infof("[gRPC] server listening on: %s", s.lis.Addr().String())
s.health.Resume() // s.health.Resume()
return s.Serve(s.lis) return s.Serve(s.lis)
} }
// Stop stop the gRPC server. // Stop stop the gRPC server.
func (s *Server) Stop(ctx context.Context) error { func (s *Server) Stop(ctx context.Context) error {
s.health.Shutdown()
s.GracefulStop() s.GracefulStop()
// s.health.Shutdown()
s.log.Info("[gRPC] server stopping") s.log.Info("[gRPC] server stopping")
return nil return nil
} }

Loading…
Cancel
Save