diff --git a/app.go b/app.go index 8418af429..d2c94491b 100644 --- a/app.go +++ b/app.go @@ -9,6 +9,7 @@ import ( "syscall" "time" + "github.com/go-kratos/kratos/v2/health" "github.com/go-kratos/kratos/v2/log" "github.com/go-kratos/kratos/v2/registry" "github.com/go-kratos/kratos/v2/transport" @@ -24,6 +25,7 @@ type AppInfo interface { Version() string Metadata() map[string]string Endpoint() []string + Health() *health.Health } // App is an application components lifecycle manager. @@ -33,6 +35,7 @@ type App struct { cancel func() lk sync.Mutex instance *registry.ServiceInstance + health *health.Health } // New create an application lifecycle manager. @@ -55,6 +58,7 @@ func New(opts ...Option) *App { ctx: ctx, cancel: cancel, opts: o, + health: health.New(), } } @@ -78,6 +82,9 @@ func (a *App) Endpoint() []string { return a.instance.Endpoints } +// Health returns health +func (a *App) Health() *health.Health { return a.health } + // Run executes all OnStart hooks registered with the application's Lifecycle. func (a *App) Run() error { instance, err := a.buildInstance() @@ -102,6 +109,10 @@ func (a *App) Run() error { }) } wg.Wait() + err = a.health.SetStatus(a.Name(), health.Status_SERVING) + if err != nil { + log.Errorf("set health status error: %v", err) + } if a.opts.registrar != nil { rctx, rcancel := context.WithTimeout(a.opts.ctx, a.opts.registrarTimeout) defer rcancel() @@ -118,6 +129,10 @@ func (a *App) Run() error { for { select { case <-ctx.Done(): + err = a.health.SetStatus(a.Name(), health.Status_NOT_SERVING) + if err != nil { + log.Errorf("set health status error: %v", err) + } return ctx.Err() case <-c: err := a.Stop() @@ -136,6 +151,10 @@ func (a *App) Run() error { // Stop gracefully stops the application. func (a *App) Stop() error { + err := a.health.SetStatus(a.Name(), health.Status_NOT_SERVING) + if err != nil { + log.Errorf("set health status error: %v", err) + } a.lk.Lock() instance := a.instance a.lk.Unlock() diff --git a/app_test.go b/app_test.go index 9165ce075..7c37e45e0 100644 --- a/app_test.go +++ b/app_test.go @@ -9,8 +9,6 @@ import ( "time" "github.com/go-kratos/kratos/v2/registry" - "github.com/go-kratos/kratos/v2/transport/grpc" - "github.com/go-kratos/kratos/v2/transport/http" ) type mockRegistry struct { @@ -40,12 +38,12 @@ func (r *mockRegistry) Deregister(ctx context.Context, service *registry.Service } func TestApp(t *testing.T) { - hs := http.NewServer() - gs := grpc.NewServer() + // hs := http.NewServer() + // gs := grpc.NewServer() app := New( Name("kratos"), Version("v1.0.0"), - Server(hs, gs), + // Server(hs, gs), Registrar(&mockRegistry{service: make(map[string]*registry.ServiceInstance)}), ) time.AfterFunc(time.Second, func() { diff --git a/examples/health/client/main.go b/examples/health/client/main.go new file mode 100644 index 000000000..c4fb2f746 --- /dev/null +++ b/examples/health/client/main.go @@ -0,0 +1,44 @@ +package main + +import ( + "context" + "fmt" + "io" + "log" + + "github.com/go-kratos/kratos/v2/middleware/recovery" + transgrpc "github.com/go-kratos/kratos/v2/transport/grpc" + pb "google.golang.org/grpc/health/grpc_health_v1" +) + +func main() { + conn, err := transgrpc.DialInsecure( + context.Background(), + transgrpc.WithEndpoint("127.0.0.1:9010"), + transgrpc.WithMiddleware( + recovery.Recovery(), + ), + ) + if err != nil { + panic(err) + } + defer conn.Close() + + client := pb.NewHealthClient(conn) + stream, err := client.Watch(context.Background(), &pb.HealthCheckRequest{Service: "health"}) + if err != nil { + log.Fatal(err) + } + for { + res, err := stream.Recv() + if err == io.EOF { + fmt.Println("EOF") + break + } + if err != nil { + log.Fatalf("ListStr get stream err: %v", err) + } + // 打印返回值 + log.Println(res.Status) + } +} diff --git a/examples/health/server/main.go b/examples/health/server/main.go new file mode 100644 index 000000000..7beebdd46 --- /dev/null +++ b/examples/health/server/main.go @@ -0,0 +1,59 @@ +package main + +import ( + "context" + "fmt" + "log" + + "github.com/go-kratos/kratos/examples/helloworld/helloworld" + "github.com/go-kratos/kratos/v2" + "github.com/go-kratos/kratos/v2/errors" + "github.com/go-kratos/kratos/v2/middleware/recovery" + "github.com/go-kratos/kratos/v2/transport/grpc" +) + +// go build -ldflags "-X main.Version=x.y.z" +var ( + // Name is the name of the compiled software. + Name = "health" + // Version is the version of the compiled software. + // Version = "v1.0.0" +) + +// server is used to implement helloworld.GreeterServer. +type server struct { + helloworld.UnimplementedGreeterServer +} + +// SayHello implements helloworld.GreeterServer +func (s *server) SayHello(ctx context.Context, in *helloworld.HelloRequest) (*helloworld.HelloReply, error) { + if in.Name == "error" { + return nil, errors.BadRequest("custom_error", fmt.Sprintf("invalid argument %s", in.Name)) + } + if in.Name == "panic" { + panic("server panic") + } + return &helloworld.HelloReply{Message: fmt.Sprintf("Hello %+v", in.Name)}, nil +} + +func main() { + s := &server{} + grpcSrv := grpc.NewServer( + grpc.Address(":9010"), + grpc.Middleware( + recovery.Recovery(), + ), + ) + helloworld.RegisterGreeterServer(grpcSrv, s) + + app := kratos.New( + kratos.Name(Name), + kratos.Server( + grpcSrv, + ), + ) + + if err := app.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/health/health.go b/health/health.go new file mode 100644 index 000000000..f26fd02d0 --- /dev/null +++ b/health/health.go @@ -0,0 +1,83 @@ +package health + +import ( + "context" + "sync" + "time" + + "golang.org/x/sync/errgroup" +) + +type Health struct { + // key: service name, type is string + // value: service Status type is Status + statusMap sync.Map + // key: service name, type is string + // value: update, type is sync.Map + // update: key is id, type is string, value: chan Status + updates sync.Map + ticker *time.Ticker +} + +func New(opts ...Option) *Health { + option := options{ + watchTime: time.Second * 5, + } + for _, o := range opts { + o(&option) + } + h := &Health{ + ticker: time.NewTicker(option.watchTime), + } + return h +} + +func (h *Health) GetStatus(service string) (Status, bool) { + status, ok := h.statusMap.Load(service) + if !ok { + return Status_UNKNOWN, false + } + return status.(Status), ok +} + +func (h *Health) SetStatus(service string, status Status) error { + h.statusMap.Store(service, status) + ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*100) + defer cancel() + eg, ctx := errgroup.WithContext(ctx) + u, _ := h.updates.LoadOrStore(service, sync.Map{}) + update := u.(sync.Map) + + update.Range(func(key, value interface{}) bool { + ch := value.(chan Status) + eg.Go(func() error { + select { + case ch <- status: + case <-ctx.Done(): + return ctx.Err() + } + return nil + }) + return true + }) + + return eg.Wait() +} + +func (h *Health) Update(service string, id string) chan Status { + u, _ := h.updates.LoadOrStore(service, sync.Map{}) + update := u.(sync.Map) + ch, _ := update.LoadOrStore(id, make(chan Status, 1)) + return ch.(chan Status) +} + +func (h *Health) DelUpdate(service string, id string) { + if u, ok := h.updates.Load(service); ok { + update := u.(sync.Map) + update.Delete(id) + } +} + +func (h *Health) Ticker() *time.Ticker { + return h.ticker +} diff --git a/health/health.pb.go b/health/health.pb.go new file mode 100644 index 000000000..abee3bf19 --- /dev/null +++ b/health/health.pb.go @@ -0,0 +1,137 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.17.3 +// source: health/health.proto + +package health + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type Status int32 + +const ( + Status_UNKNOWN Status = 0 + Status_SERVING Status = 1 + Status_NOT_SERVING Status = 2 + Status_SERVICE_UNKNOWN Status = 3 +) + +// Enum value maps for Status. +var ( + Status_name = map[int32]string{ + 0: "UNKNOWN", + 1: "SERVING", + 2: "NOT_SERVING", + 3: "SERVICE_UNKNOWN", + } + Status_value = map[string]int32{ + "UNKNOWN": 0, + "SERVING": 1, + "NOT_SERVING": 2, + "SERVICE_UNKNOWN": 3, + } +) + +func (x Status) Enum() *Status { + p := new(Status) + *p = x + return p +} + +func (x Status) String() string { + return protoimpl.X.EnumStringOf(x.Descriptor(), protoreflect.EnumNumber(x)) +} + +func (Status) Descriptor() protoreflect.EnumDescriptor { + return file_health_health_proto_enumTypes[0].Descriptor() +} + +func (Status) Type() protoreflect.EnumType { + return &file_health_health_proto_enumTypes[0] +} + +func (x Status) Number() protoreflect.EnumNumber { + return protoreflect.EnumNumber(x) +} + +// Deprecated: Use Status.Descriptor instead. +func (Status) EnumDescriptor() ([]byte, []int) { + return file_health_health_proto_rawDescGZIP(), []int{0} +} + +var File_health_health_proto protoreflect.FileDescriptor + +var file_health_health_proto_rawDesc = []byte{ + 0x0a, 0x13, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2e, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x06, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x2a, 0x48, 0x0a, + 0x06, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x12, 0x0b, 0x0a, 0x07, 0x55, 0x4e, 0x4b, 0x4e, 0x4f, + 0x57, 0x4e, 0x10, 0x00, 0x12, 0x0b, 0x0a, 0x07, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, 0x10, + 0x01, 0x12, 0x0f, 0x0a, 0x0b, 0x4e, 0x4f, 0x54, 0x5f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x4e, 0x47, + 0x10, 0x02, 0x12, 0x13, 0x0a, 0x0f, 0x53, 0x45, 0x52, 0x56, 0x49, 0x43, 0x45, 0x5f, 0x55, 0x4e, + 0x4b, 0x4e, 0x4f, 0x57, 0x4e, 0x10, 0x03, 0x42, 0x2e, 0x5a, 0x2c, 0x67, 0x69, 0x74, 0x68, 0x75, + 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2d, 0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f, + 0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f, 0x76, 0x32, 0x2f, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, + 0x3b, 0x68, 0x65, 0x61, 0x6c, 0x74, 0x68, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_health_health_proto_rawDescOnce sync.Once + file_health_health_proto_rawDescData = file_health_health_proto_rawDesc +) + +func file_health_health_proto_rawDescGZIP() []byte { + file_health_health_proto_rawDescOnce.Do(func() { + file_health_health_proto_rawDescData = protoimpl.X.CompressGZIP(file_health_health_proto_rawDescData) + }) + return file_health_health_proto_rawDescData +} + +var file_health_health_proto_enumTypes = make([]protoimpl.EnumInfo, 1) +var file_health_health_proto_goTypes = []interface{}{ + (Status)(0), // 0: health.Status +} +var file_health_health_proto_depIdxs = []int32{ + 0, // [0:0] is the sub-list for method output_type + 0, // [0:0] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_health_health_proto_init() } +func file_health_health_proto_init() { + if File_health_health_proto != nil { + return + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_health_health_proto_rawDesc, + NumEnums: 1, + NumMessages: 0, + NumExtensions: 0, + NumServices: 0, + }, + GoTypes: file_health_health_proto_goTypes, + DependencyIndexes: file_health_health_proto_depIdxs, + EnumInfos: file_health_health_proto_enumTypes, + }.Build() + File_health_health_proto = out.File + file_health_health_proto_rawDesc = nil + file_health_health_proto_goTypes = nil + file_health_health_proto_depIdxs = nil +} diff --git a/health/health.proto b/health/health.proto new file mode 100644 index 000000000..a44f7b145 --- /dev/null +++ b/health/health.proto @@ -0,0 +1,12 @@ +syntax = "proto3"; + +package health; + +option go_package = "github.com/go-kratos/kratos/v2/health;health"; + +enum Status { + UNKNOWN = 0; + SERVING = 1; + NOT_SERVING = 2; + SERVICE_UNKNOWN = 3; +} \ No newline at end of file diff --git a/health/option.go b/health/option.go new file mode 100644 index 000000000..d4ebe0e8c --- /dev/null +++ b/health/option.go @@ -0,0 +1,15 @@ +package health + +import "time" + +type Option func(*options) + +type options struct { + watchTime time.Duration +} + +func WithWatchTime(t time.Duration) Option { + return func(o *options) { + o.watchTime = t + } +} diff --git a/transport/grpc/health/health.go b/transport/grpc/health/health.go new file mode 100644 index 000000000..4941a7a3e --- /dev/null +++ b/transport/grpc/health/health.go @@ -0,0 +1,99 @@ +package health + +import ( + "context" + + "github.com/go-kratos/kratos/v2" + "github.com/go-kratos/kratos/v2/errors" + "github.com/go-kratos/kratos/v2/health" + "github.com/google/uuid" + pb "google.golang.org/grpc/health/grpc_health_v1" +) + +type Server struct { + pb.UnimplementedHealthServer +} + +func NewServer() *Server { + return &Server{} +} + +func (s *Server) Check(ctx context.Context, req *pb.HealthCheckRequest) (*pb.HealthCheckResponse, error) { + info, ok := kratos.FromContext(ctx) + if !ok { + return nil, errors.InternalServer("kratos.FromContext(ctx) failed", "no info found in context") + } + status, ok := info.Health().GetStatus(req.Service) + if !ok { + status = health.Status_UNKNOWN + } + var sv pb.HealthCheckResponse_ServingStatus + switch status { + case health.Status_SERVING: + sv = pb.HealthCheckResponse_SERVING + case health.Status_NOT_SERVING: + sv = pb.HealthCheckResponse_NOT_SERVING + case health.Status_SERVICE_UNKNOWN: + sv = pb.HealthCheckResponse_SERVICE_UNKNOWN + default: + sv = pb.HealthCheckResponse_NOT_SERVING + } + return &pb.HealthCheckResponse{ + Status: sv, + }, nil +} + +func (s *Server) Watch(req *pb.HealthCheckRequest, ss pb.Health_WatchServer) (err error) { + ctx := ss.Context() + info, ok := kratos.FromContext(ctx) + if !ok { + return errors.InternalServer("get info failed", "") + } + uid, err := uuid.NewUUID() + if err != nil { + return errors.InternalServer("new uuid failed", err.Error()) + } + update := info.Health().Update(req.Service, uid.String()) + defer info.Health().DelUpdate(req.Service, uid.String()) + status, ok := info.Health().GetStatus(req.Service) + if !ok { + update <- health.Status_SERVICE_UNKNOWN + } else { + update <- status + } + ticker := info.Health().Ticker() + for { + select { + case <-ctx.Done(): + return nil + case status = <-update: + err := send(ss, status) + if err != nil { + return err + } + case <-ticker.C: + err := send(ss, status) + if err != nil { + return err + } + } + } +} + +func send(ss pb.Health_WatchServer, status health.Status) error { + var sv pb.HealthCheckResponse_ServingStatus + switch status { + case health.Status_SERVING: + sv = pb.HealthCheckResponse_SERVING + case health.Status_NOT_SERVING: + sv = pb.HealthCheckResponse_NOT_SERVING + case health.Status_SERVICE_UNKNOWN: + sv = pb.HealthCheckResponse_SERVICE_UNKNOWN + default: + sv = pb.HealthCheckResponse_NOT_SERVING + } + reply := &pb.HealthCheckResponse{ + Status: sv, + } + return ss.Send(reply) +} diff --git a/transport/grpc/server.go b/transport/grpc/server.go index caf4e6941..4ecdcd689 100644 --- a/transport/grpc/server.go +++ b/transport/grpc/server.go @@ -16,9 +16,9 @@ import ( "github.com/go-kratos/kratos/v2/middleware" "github.com/go-kratos/kratos/v2/transport" + "github.com/go-kratos/kratos/v2/transport/grpc/health" "google.golang.org/grpc" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/health" "google.golang.org/grpc/health/grpc_health_v1" "google.golang.org/grpc/reflection" @@ -185,14 +185,14 @@ func (s *Server) Start(ctx context.Context) error { } s.baseCtx = ctx s.log.Infof("[gRPC] server listening on: %s", s.lis.Addr().String()) - s.health.Resume() + // s.health.Resume() return s.Serve(s.lis) } // Stop stop the gRPC server. func (s *Server) Stop(ctx context.Context) error { s.GracefulStop() - s.health.Shutdown() + // s.health.Shutdown() s.log.Info("[gRPC] server stopping") return nil }