feat:add grpc health check

feat_health_check
haiyux 3 years ago
parent 9bf178b1d1
commit 2f74c9dfae
  1. 19
      app.go
  2. 8
      app_test.go
  3. 44
      examples/health/client/main.go
  4. 59
      examples/health/server/main.go
  5. 83
      health/health.go
  6. 137
      health/health.pb.go
  7. 12
      health/health.proto
  8. 15
      health/option.go
  9. 99
      transport/grpc/health/health.go
  10. 6
      transport/grpc/server.go

@ -9,6 +9,7 @@ import (
"syscall" "syscall"
"time" "time"
"github.com/go-kratos/kratos/v2/health"
"github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/log"
"github.com/go-kratos/kratos/v2/registry" "github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport" "github.com/go-kratos/kratos/v2/transport"
@ -24,6 +25,7 @@ type AppInfo interface {
Version() string Version() string
Metadata() map[string]string Metadata() map[string]string
Endpoint() []string Endpoint() []string
Health() *health.Health
} }
// App is an application components lifecycle manager. // App is an application components lifecycle manager.
@ -33,6 +35,7 @@ type App struct {
cancel func() cancel func()
lk sync.Mutex lk sync.Mutex
instance *registry.ServiceInstance instance *registry.ServiceInstance
health *health.Health
} }
// New create an application lifecycle manager. // New create an application lifecycle manager.
@ -55,6 +58,7 @@ func New(opts ...Option) *App {
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
opts: o, opts: o,
health: health.New(),
} }
} }
@ -78,6 +82,9 @@ func (a *App) Endpoint() []string {
return a.instance.Endpoints return a.instance.Endpoints
} }
// Health returns health
func (a *App) Health() *health.Health { return a.health }
// Run executes all OnStart hooks registered with the application's Lifecycle. // Run executes all OnStart hooks registered with the application's Lifecycle.
func (a *App) Run() error { func (a *App) Run() error {
instance, err := a.buildInstance() instance, err := a.buildInstance()
@ -102,6 +109,10 @@ func (a *App) Run() error {
}) })
} }
wg.Wait() wg.Wait()
err = a.health.SetStatus(a.Name(), health.Status_SERVING)
if err != nil {
log.Errorf("set health status error: %v", err)
}
if a.opts.registrar != nil { if a.opts.registrar != nil {
rctx, rcancel := context.WithTimeout(a.opts.ctx, a.opts.registrarTimeout) rctx, rcancel := context.WithTimeout(a.opts.ctx, a.opts.registrarTimeout)
defer rcancel() defer rcancel()
@ -118,6 +129,10 @@ func (a *App) Run() error {
for { for {
select { select {
case <-ctx.Done(): case <-ctx.Done():
err = a.health.SetStatus(a.Name(), health.Status_NOT_SERVING)
if err != nil {
log.Errorf("set health status error: %v", err)
}
return ctx.Err() return ctx.Err()
case <-c: case <-c:
err := a.Stop() err := a.Stop()
@ -136,6 +151,10 @@ func (a *App) Run() error {
// Stop gracefully stops the application. // Stop gracefully stops the application.
func (a *App) Stop() error { func (a *App) Stop() error {
err := a.health.SetStatus(a.Name(), health.Status_NOT_SERVING)
if err != nil {
log.Errorf("set health status error: %v", err)
}
a.lk.Lock() a.lk.Lock()
instance := a.instance instance := a.instance
a.lk.Unlock() a.lk.Unlock()

@ -9,8 +9,6 @@ import (
"time" "time"
"github.com/go-kratos/kratos/v2/registry" "github.com/go-kratos/kratos/v2/registry"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
) )
type mockRegistry struct { type mockRegistry struct {
@ -40,12 +38,12 @@ func (r *mockRegistry) Deregister(ctx context.Context, service *registry.Service
} }
func TestApp(t *testing.T) { func TestApp(t *testing.T) {
hs := http.NewServer() // hs := http.NewServer()
gs := grpc.NewServer() // gs := grpc.NewServer()
app := New( app := New(
Name("kratos"), Name("kratos"),
Version("v1.0.0"), Version("v1.0.0"),
Server(hs, gs), // Server(hs, gs),
Registrar(&mockRegistry{service: make(map[string]*registry.ServiceInstance)}), Registrar(&mockRegistry{service: make(map[string]*registry.ServiceInstance)}),
) )
time.AfterFunc(time.Second, func() { time.AfterFunc(time.Second, func() {

@ -0,0 +1,44 @@
package main
import (
"context"
"fmt"
"io"
"log"
"github.com/go-kratos/kratos/v2/middleware/recovery"
transgrpc "github.com/go-kratos/kratos/v2/transport/grpc"
pb "google.golang.org/grpc/health/grpc_health_v1"
)
func main() {
conn, err := transgrpc.DialInsecure(
context.Background(),
transgrpc.WithEndpoint("127.0.0.1:9010"),
transgrpc.WithMiddleware(
recovery.Recovery(),
),
)
if err != nil {
panic(err)
}
defer conn.Close()
client := pb.NewHealthClient(conn)
stream, err := client.Watch(context.Background(), &pb.HealthCheckRequest{Service: "health"})
if err != nil {
log.Fatal(err)
}
for {
res, err := stream.Recv()
if err == io.EOF {
fmt.Println("EOF")
break
}
if err != nil {
log.Fatalf("ListStr get stream err: %v", err)
}
// 打印返回值
log.Println(res.Status)
}
}

@ -0,0 +1,59 @@
package main
import (
"context"
"fmt"
"log"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/grpc"
)
// go build -ldflags "-X main.Version=x.y.z"
var (
// Name is the name of the compiled software.
Name = "health"
// Version is the version of the compiled software.
// Version = "v1.0.0"
)
// server is used to implement helloworld.GreeterServer.
type server struct {
helloworld.UnimplementedGreeterServer
}
// SayHello implements helloworld.GreeterServer
func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) {
if in.Name == "error" {
return nil, errors.BadRequest("custom_error", fmt.Sprintf("invalid argument %s", in.Name))
}
if in.Name == "panic" {
panic("server panic")
}
return &helloworld.HelloReply{Message: fmt.Sprintf("Hello %+v", in.Name)}, nil
}
func main() {
s := &server{}
grpcSrv := grpc.NewServer(
grpc.Address(":9010"),
grpc.Middleware(
recovery.Recovery(),
),
)
helloworld.RegisterGreeterServer(grpcSrv, s)
app := kratos.New(
kratos.Name(Name),
kratos.Server(
grpcSrv,
),
)
if err := app.Run(); err != nil {
log.Fatal(err)
}
}

@ -0,0 +1,83 @@
package health
import (
"context"
"sync"
"time"
"golang.org/x/sync/errgroup"
)
type Health struct {
// key: service name, type is string
// value: service Status type is Status
statusMap sync.Map
// key: service name, type is string
// value: update, type is sync.Map
// update: key is id, type is string, value: chan Status
updates sync.Map
ticker *time.Ticker
}
func New(opts ...Option) *Health {
option := options{
watchTime: time.Second * 5,
}
for _, o := range opts {
o(&option)
}
h := &Health{
ticker: time.NewTicker(option.watchTime),
}
return h
}
func (h *Health) GetStatus(service string) (Status, bool) {
status, ok := h.statusMap.Load(service)
if !ok {
return Status_UNKNOWN, false
}
return status.(Status), ok
}
func (h *Health) SetStatus(service string, status Status) error {
h.statusMap.Store(service, status)
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100)
defer cancel()
eg, ctx := errgroup.WithContext(ctx)
u, _ := h.updates.LoadOrStore(service, sync.Map{})
update := u.(sync.Map)
update.Range(func(key, value interface{}) bool {
ch := value.(chan Status)
eg.Go(func() error {
select {
case ch <- status:
case <-ctx.Done():
return ctx.Err()
}
return nil
})
return true
})
return eg.Wait()
}
func (h *Health) Update(service string, id string) chan Status {
u, _ := h.updates.LoadOrStore(service, sync.Map{})
update := u.(sync.Map)
ch, _ := update.LoadOrStore(id, make(chan Status, 1))
return ch.(chan Status)
}
func (h *Health) DelUpdate(service string, id string) {
if u, ok := h.updates.Load(service); ok {
update := u.(sync.Map)
update.Delete(id)
}
}
func (h *Health) Ticker() *time.Ticker {
return h.ticker
}

@ -0,0 +1,137 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
// protoc-gen-go v1.27.1
// protoc v3.17.3
// source: health/health.proto
package health
import (
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
reflect "reflect"
sync "sync"
)
const (
// Verify that this generated code is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
// Verify that runtime/protoimpl is sufficiently up-to-date.
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
type Status int32
const (
Status_UNKNOWN Status = 0
Status_SERVING Status = 1
Status_NOT_SERVING Status = 2
Status_SERVICE_UNKNOWN Status = 3
)
// Enum value maps for Status.
var (
Status_name = map[int32]string{
0: "UNKNOWN",
1: "SERVING",
2: "NOT_SERVING",
3: "SERVICE_UNKNOWN",
}
Status_value = map[string]int32{
"UNKNOWN": 0,
"SERVING": 1,
"NOT_SERVING": 2,
"SERVICE_UNKNOWN": 3,
}
)
func (x Status) Enum() *Status {
p := new(Status)
*p = x
return p
}
func (x Status) String() string {
return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x))
}
func (Status) Descriptor() protoreflect.EnumDescriptor {
return file_health_health_proto_enumTypes[0].Descriptor()
}
func (Status) Type() protoreflect.EnumType {
return &file_health_health_proto_enumTypes[0]
}
func (x Status) Number() protoreflect.EnumNumber {
return protoreflect.EnumNumber(x)
}
// Deprecated: Use Status.Descriptor instead.
func (Status) EnumDescriptor() ([]byte, []int) {
return file_health_health_proto_rawDescGZIP(), []int{0}
}
var File_health_health_proto protoreflect.FileDescriptor
var file_health_health_proto_rawDesc = []byte{
0x0a, 0x13, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2a, 0x48, 0x0a,
0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f,
0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10,
0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e, 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47,
0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45, 0x5f, 0x55, 0x4e,
0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x03, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75,
0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2d, 0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f,
0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68,
0x3b, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
file_health_health_proto_rawDescOnce sync.Once
file_health_health_proto_rawDescData = file_health_health_proto_rawDesc
)
func file_health_health_proto_rawDescGZIP() []byte {
file_health_health_proto_rawDescOnce.Do(func() {
file_health_health_proto_rawDescData = protoimpl.X.CompressGZIP(file_health_health_proto_rawDescData)
})
return file_health_health_proto_rawDescData
}
var file_health_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1)
var file_health_health_proto_goTypes = []interface{}{
(Status)(0), // 0: health.Status
}
var file_health_health_proto_depIdxs = []int32{
0, // [0:0] is the sub-list for method output_type
0, // [0:0] is the sub-list for method input_type
0, // [0:0] is the sub-list for extension type_name
0, // [0:0] is the sub-list for extension extendee
0, // [0:0] is the sub-list for field type_name
}
func init() { file_health_health_proto_init() }
func file_health_health_proto_init() {
if File_health_health_proto != nil {
return
}
type x struct{}
out := protoimpl.TypeBuilder{
File: protoimpl.DescBuilder{
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
RawDescriptor: file_health_health_proto_rawDesc,
NumEnums: 1,
NumMessages: 0,
NumExtensions: 0,
NumServices: 0,
},
GoTypes: file_health_health_proto_goTypes,
DependencyIndexes: file_health_health_proto_depIdxs,
EnumInfos: file_health_health_proto_enumTypes,
}.Build()
File_health_health_proto = out.File
file_health_health_proto_rawDesc = nil
file_health_health_proto_goTypes = nil
file_health_health_proto_depIdxs = nil
}

@ -0,0 +1,12 @@
syntax = "proto3";
package health;
option go_package = "github.com/go-kratos/kratos/v2/health;health";
enum Status {
UNKNOWN = 0;
SERVING = 1;
NOT_SERVING = 2;
SERVICE_UNKNOWN = 3;
}

@ -0,0 +1,15 @@
package health
import "time"
type Option func(*options)
type options struct {
watchTime time.Duration
}
func WithWatchTime(t time.Duration) Option {
return func(o *options) {
o.watchTime = t
}
}

@ -0,0 +1,99 @@
package health
import (
"context"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/health"
"github.com/google/uuid"
pb "google.golang.org/grpc/health/grpc_health_v1"
)
type Server struct {
pb.UnimplementedHealthServer
}
func NewServer() *Server {
return &Server{}
}
func (s *Server) Check(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) {
info, ok := kratos.FromContext(ctx)
if !ok {
return nil, errors.InternalServer("kratos.FromContext(ctx) failed", "no info found in context")
}
status, ok := info.Health().GetStatus(req.Service)
if !ok {
status = health.Status_UNKNOWN
}
var sv pb.HealthCheckResponse_ServingStatus
switch status {
case health.Status_SERVING:
sv = pb.HealthCheckResponse_SERVING
case health.Status_NOT_SERVING:
sv = pb.HealthCheckResponse_NOT_SERVING
case health.Status_SERVICE_UNKNOWN:
sv = pb.HealthCheckResponse_SERVICE_UNKNOWN
default:
sv = pb.HealthCheckResponse_NOT_SERVING
}
return &pb.HealthCheckResponse{
Status: sv,
}, nil
}
func (s *Server) Watch(req *pb.HealthCheckRequest, ss pb.Health_WatchServer) (err error) {
ctx := ss.Context()
info, ok := kratos.FromContext(ctx)
if !ok {
return errors.InternalServer("get info failed", "")
}
uid, err := uuid.NewUUID()
if err != nil {
return errors.InternalServer("new uuid failed", err.Error())
}
update := info.Health().Update(req.Service, uid.String())
defer info.Health().DelUpdate(req.Service, uid.String())
status, ok := info.Health().GetStatus(req.Service)
if !ok {
update <- health.Status_SERVICE_UNKNOWN
} else {
update <- status
}
ticker := info.Health().Ticker()
for {
select {
case <-ctx.Done():
return nil
case status = <-update:
err := send(ss, status)
if err != nil {
return err
}
case <-ticker.C:
err := send(ss, status)
if err != nil {
return err
}
}
}
}
func send(ss pb.Health_WatchServer, status health.Status) error {
var sv pb.HealthCheckResponse_ServingStatus
switch status {
case health.Status_SERVING:
sv = pb.HealthCheckResponse_SERVING
case health.Status_NOT_SERVING:
sv = pb.HealthCheckResponse_NOT_SERVING
case health.Status_SERVICE_UNKNOWN:
sv = pb.HealthCheckResponse_SERVICE_UNKNOWN
default:
sv = pb.HealthCheckResponse_NOT_SERVING
}
reply := &pb.HealthCheckResponse{
Status: sv,
}
return ss.Send(reply)
}

@ -16,9 +16,9 @@ import (
"github.com/go-kratos/kratos/v2/middleware" "github.com/go-kratos/kratos/v2/middleware"
"github.com/go-kratos/kratos/v2/transport" "github.com/go-kratos/kratos/v2/transport"
"github.com/go-kratos/kratos/v2/transport/grpc/health"
"google.golang.org/grpc" "google.golang.org/grpc"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/health/grpc_health_v1"
"google.golang.org/grpc/reflection" "google.golang.org/grpc/reflection"
@ -185,14 +185,14 @@ func (s *Server) Start(ctx context.Context) error {
} }
s.baseCtx = ctx s.baseCtx = ctx
s.log.Infof("[gRPC] server listening on: %s", s.lis.Addr().String()) s.log.Infof("[gRPC] server listening on: %s", s.lis.Addr().String())
s.health.Resume() // s.health.Resume()
return s.Serve(s.lis) return s.Serve(s.lis)
} }
// Stop stop the gRPC server. // Stop stop the gRPC server.
func (s *Server) Stop(ctx context.Context) error { func (s *Server) Stop(ctx context.Context) error {
s.GracefulStop() s.GracefulStop()
s.health.Shutdown() // s.health.Shutdown()
s.log.Info("[gRPC] server stopping") s.log.Info("[gRPC] server stopping")
return nil return nil
} }

Loading…
Cancel
Save