From f65a0a9134907323a0228aa9842f3d6b8f706218 Mon Sep 17 00:00:00 2001 From: longxboy Date: Thu, 5 Aug 2021 18:51:20 +0800 Subject: [PATCH] feat: enhance tracing (#1300) * feat: enhance tracing * fix go lint --- examples/go.mod | 8 +- examples/go.sum | 9 ++ examples/traces/app/message/main.go | 32 ++++--- examples/traces/app/user/main.go | 41 ++++---- go.mod | 6 +- go.sum | 15 ++- middleware/tracing/metadata.go | 43 +++++++++ middleware/tracing/span.go | 144 ++++++++++++++++++++++++++++ middleware/tracing/statsHandler.go | 39 ++++++++ middleware/tracing/tracer.go | 45 +++++---- middleware/tracing/tracing.go | 19 ++-- middleware/tracing/tracing_test.go | 8 +- 12 files changed, 330 insertions(+), 79 deletions(-) create mode 100644 middleware/tracing/metadata.go create mode 100644 middleware/tracing/span.go create mode 100644 middleware/tracing/statsHandler.go diff --git a/examples/go.mod b/examples/go.mod index 6cdfc0756..21db4c997 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -32,10 +32,10 @@ require ( github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 go.etcd.io/etcd/client/v3 v3.5.0 - go.opentelemetry.io/otel v1.0.0-RC1 - go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC1 - go.opentelemetry.io/otel/sdk v1.0.0-RC1 - go.opentelemetry.io/otel/trace v1.0.0-RC1 + go.opentelemetry.io/otel v1.0.0-RC2 + go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2 + go.opentelemetry.io/otel/sdk v1.0.0-RC2 + go.opentelemetry.io/otel/trace v1.0.0-RC2 go.uber.org/zap v1.18.1 golang.org/x/text v0.3.6 google.golang.org/genproto v0.0.0-20210701191553-46259e63a0a9 diff --git a/examples/go.sum b/examples/go.sum index 7e1bc5751..0c54363fd 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -709,8 +709,12 @@ go.opentelemetry.io/otel v0.17.0/go.mod h1:Oqtdxmf7UtEvL037ohlgnaYa1h7GtMh0NcSd9 go.opentelemetry.io/otel v0.20.0/go.mod h1:Y3ugLH2oa81t5QO+Lty+zXf8zC9L26ax4Nzoxm/dooo= go.opentelemetry.io/otel v1.0.0-RC1 h1:4CeoX93DNTWt8awGK9JmNXzF9j7TyOu9upscEdtcdXc= go.opentelemetry.io/otel v1.0.0-RC1/go.mod h1:x9tRa9HK4hSSq7jf2TKbqFbtt58/TGk0f9XiEYISI1I= +go.opentelemetry.io/otel v1.0.0-RC2 h1:SHhxSjB+omnGZPgGlKe+QMp3MyazcOHdQ8qwo89oKbg= +go.opentelemetry.io/otel v1.0.0-RC2/go.mod h1:w1thVQ7qbAy8MHb0IFj8a5Q2QU0l2ksf8u/CN8m3NOM= go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC1 h1:tVhw2BMSAk248rhdeirOe9hlXKwGHDvVtF7P8F+H2DU= go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC1/go.mod h1:FXJnjGCoTQL6nQ8OpFJ0JI1DrdOvMoVx49ic0Hg4+D4= +go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2 h1:RF0nWsIDpDBe+s06lkLxUw9CWQUAhO6hBSxxB7dz45s= +go.opentelemetry.io/otel/exporters/jaeger v1.0.0-RC2/go.mod h1:sZZqN3Vb0iT+NE6mZ1S7sNyH3t4PFk6ElK5TLGFBZ7E= go.opentelemetry.io/otel/internal/metric v0.21.0 h1:gZlIBo5O51hZOOZz8vEcuRx/l5dnADadKfpT70AELoo= go.opentelemetry.io/otel/internal/metric v0.21.0/go.mod h1:iOfAaY2YycsXfYD4kaRSbLx2LKmfpKObWBEv9QK5zFo= go.opentelemetry.io/otel/metric v0.17.0/go.mod h1:hUz9lH1rNXyEwWAhIWCMFWKhYtpASgSnObJFnU26dJ0= @@ -723,10 +727,14 @@ go.opentelemetry.io/otel/oteltest v1.0.0-RC1 h1:G685iP3XiskCwk/z0eIabL55XUl2gk0c go.opentelemetry.io/otel/oteltest v1.0.0-RC1/go.mod h1:+eoIG0gdEOaPNftuy1YScLr1Gb4mL/9lpDkZ0JjMRq4= go.opentelemetry.io/otel/sdk v1.0.0-RC1 h1:Sy2VLOOg24bipyC29PhuMXYNJrLsxkie8hyI7kUlG9Q= go.opentelemetry.io/otel/sdk v1.0.0-RC1/go.mod h1:kj6yPn7Pgt5ByRuwesbaWcRLA+V7BSDg3Hf8xRvsvf8= +go.opentelemetry.io/otel/sdk v1.0.0-RC2 h1:ROuteeSCBaZNjiT9JcFzZepmInDvLktR28Y6qKo8bCs= +go.opentelemetry.io/otel/sdk v1.0.0-RC2/go.mod h1:fgwHyiDn4e5k40TD9VX243rOxXR+jzsWBZYA2P5jpEw= go.opentelemetry.io/otel/trace v0.17.0/go.mod h1:bIujpqg6ZL6xUTubIUgziI1jSaUPthmabA/ygf/6Cfg= go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw= go.opentelemetry.io/otel/trace v1.0.0-RC1 h1:jrjqKJZEibFrDz+umEASeU3LvdVyWKlnTh7XEfwrT58= go.opentelemetry.io/otel/trace v1.0.0-RC1/go.mod h1:86UHmyHWFEtWjfWPSbu0+d0Pf9Q6e1U+3ViBOc+NXAg= +go.opentelemetry.io/otel/trace v1.0.0-RC2 h1:dunAP0qDULMIT82atj34m5RgvsIK6LcsXf1c/MsYg1w= +go.opentelemetry.io/otel/trace v1.0.0-RC2/go.mod h1:JPQ+z6nNw9mqEGT8o3eoPTdnNI+Aj5JcxEsVGREIAy4= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.uber.org/atomic v1.3.2/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= go.uber.org/atomic v1.4.0/go.mod h1:gD2HeocX3+yG+ygLZcrzQJaqmWj9AIm7n08wl/qW/PE= @@ -922,6 +930,7 @@ golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210403161142-5e06dd20ab57/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1 h1:SrN+KX8Art/Sf4HNj6Zcz06G7VEz+7w9tdXTPOZ7+l4= diff --git a/examples/traces/app/message/main.go b/examples/traces/app/message/main.go index 4f81b025d..a882d8353 100644 --- a/examples/traces/app/message/main.go +++ b/examples/traces/app/message/main.go @@ -12,12 +12,12 @@ import ( "github.com/go-kratos/kratos/v2/middleware/recovery" "github.com/go-kratos/kratos/v2/middleware/tracing" "github.com/go-kratos/kratos/v2/transport/grpc" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/jaeger" "go.opentelemetry.io/otel/sdk/resource" tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" - "go.opentelemetry.io/otel/trace" ) // go build -ldflags "-X main.Version=x.y.z" @@ -31,28 +31,28 @@ var ( // server is used to implement helloworld.GreeterServer. type server struct { v1.UnimplementedMessageServiceServer - tracer trace.TracerProvider } -// Get trace provider -func tracerProvider(url string) (*tracesdk.TracerProvider, error) { +// set trace provider +func setTracerProvider(url string) error { // Create the Jaeger exporter exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) if err != nil { - return nil, err + return err } tp := tracesdk.NewTracerProvider( - tracesdk.WithSampler(tracesdk.AlwaysSample()), + // Set the sampling rate based on the parent span to 100% + tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0))), // Always be sure to batch in production. tracesdk.WithBatcher(exp), // Record information about this application in an Resource. tracesdk.WithResource(resource.NewSchemaless( - semconv.ServiceNameKey.String(v1.MessageService_ServiceDesc.ServiceName), - attribute.String("environment", "development"), - attribute.Int64("ID", 1), + semconv.ServiceNameKey.String(Name), + attribute.String("env", "dev"), )), ) - return tp, nil + otel.SetTracerProvider(tp) + return nil } func (s *server) GetUserMessage(ctx context.Context, request *v1.GetUserMessageRequest) (*v1.GetUserMessageReply, error) { @@ -69,12 +69,16 @@ func main() { logger = log.With(logger, "span_id", log.SpanID()) log := log.NewHelper(logger) - tp, err := tracerProvider("http://jaeger:14268/api/traces") + url := "http://jaeger:14268/api/traces" + if os.Getenv("jaeger_url") != "" { + url = os.Getenv("jaeger_url") + } + err := setTracerProvider(url) if err != nil { log.Error(err) } - s := &server{tracer: tp} + s := &server{} // grpc server grpcSrv := grpc.NewServer( grpc.Address(":9000"), @@ -82,9 +86,7 @@ func main() { middleware.Chain( recovery.Recovery(), // Configuring tracing Middleware - tracing.Server( - tracing.WithTracerProvider(tp), - ), + tracing.Server(), logging.Server(logger), ), )) diff --git a/examples/traces/app/user/main.go b/examples/traces/app/user/main.go index 48b838acc..0ca46932d 100644 --- a/examples/traces/app/user/main.go +++ b/examples/traces/app/user/main.go @@ -14,12 +14,14 @@ import ( "github.com/go-kratos/kratos/v2/middleware/tracing" "github.com/go-kratos/kratos/v2/transport/grpc" "github.com/go-kratos/kratos/v2/transport/http" + + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/jaeger" "go.opentelemetry.io/otel/sdk/resource" tracesdk "go.opentelemetry.io/otel/sdk/trace" semconv "go.opentelemetry.io/otel/semconv/v1.4.0" - "go.opentelemetry.io/otel/trace" + grpcx "google.golang.org/grpc" ) // go build -ldflags "-X main.Version=x.y.z" @@ -33,41 +35,42 @@ var ( // server is used to implement helloworld.GreeterServer. type server struct { v1.UnimplementedUserServer - tracer trace.TracerProvider } -// Get trace provider -func tracerProvider(url string) (*tracesdk.TracerProvider, error) { +// Set global trace provider +func setTracerProvider(url string) error { // Create the Jaeger exporter exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) if err != nil { - return nil, err + return err } tp := tracesdk.NewTracerProvider( - tracesdk.WithSampler(tracesdk.AlwaysSample()), + // Set the sampling rate based on the parent span to 100% + tracesdk.WithSampler(tracesdk.ParentBased(tracesdk.TraceIDRatioBased(1.0))), // Always be sure to batch in production. tracesdk.WithBatcher(exp), // Record information about this application in an Resource. tracesdk.WithResource(resource.NewSchemaless( - semconv.ServiceNameKey.String(v1.User_ServiceDesc.ServiceName), - attribute.String("environment", "development"), - attribute.Int64("ID", 1), + semconv.ServiceNameKey.String(Name), + attribute.String("env", "dev"), )), ) - return tp, nil + otel.SetTracerProvider(tp) + return nil } func (s *server) GetMyMessages(ctx context.Context, in *v1.GetMyMessagesRequest) (*v1.GetMyMessagesReply, error) { // create grpc conn + // only for demo, use single instance in production env conn, err := grpc.DialInsecure(ctx, grpc.WithEndpoint("127.0.0.1:9000"), grpc.WithMiddleware( recovery.Recovery(), - tracing.Client( - tracing.WithTracerProvider(s.tracer), - ), + tracing.Client(), ), grpc.WithTimeout(2*time.Second), + // for tracing remote ip recording + grpc.WithOptions(grpcx.WithStatsHandler(&tracing.ClientHandler{})), ) if err != nil { return nil, err @@ -91,7 +94,11 @@ func main() { logger = log.With(logger, "span_id", log.SpanID()) log := log.NewHelper(logger) - tp, err := tracerProvider("http://jaeger:14268/api/traces") + url := "http://jaeger:14268/api/traces" + if os.Getenv("jaeger_url") != "" { + url = os.Getenv("jaeger_url") + } + err := setTracerProvider(url) if err != nil { log.Error(err) } @@ -101,13 +108,11 @@ func main() { http.Middleware( recovery.Recovery(), // Configuring tracing middleware - tracing.Server( - tracing.WithTracerProvider(tp), - ), + tracing.Server(), logging.Server(logger), ), ) - s := &server{tracer: tp} + s := &server{} v1.RegisterUserHTTPServer(httpSrv, s) app := kratos.New( diff --git a/go.mod b/go.mod index 4705ecef1..2ada1a4df 100644 --- a/go.mod +++ b/go.mod @@ -9,9 +9,9 @@ require ( github.com/gorilla/mux v1.8.0 github.com/imdario/mergo v0.3.12 github.com/stretchr/testify v1.7.0 - go.opentelemetry.io/otel v1.0.0-RC1 - go.opentelemetry.io/otel/sdk v1.0.0-RC1 - go.opentelemetry.io/otel/trace v1.0.0-RC1 + go.opentelemetry.io/otel v1.0.0-RC2 + go.opentelemetry.io/otel/sdk v1.0.0-RC2 + go.opentelemetry.io/otel/trace v1.0.0-RC2 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c google.golang.org/genproto v0.0.0-20210629200056-84d6f6074151 google.golang.org/grpc v1.39.0 diff --git a/go.sum b/go.sum index 228273543..73930d1e9 100644 --- a/go.sum +++ b/go.sum @@ -64,14 +64,12 @@ github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5 github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/yuin/goldmark v1.3.5/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= -go.opentelemetry.io/otel v1.0.0-RC1 h1:4CeoX93DNTWt8awGK9JmNXzF9j7TyOu9upscEdtcdXc= -go.opentelemetry.io/otel v1.0.0-RC1/go.mod h1:x9tRa9HK4hSSq7jf2TKbqFbtt58/TGk0f9XiEYISI1I= -go.opentelemetry.io/otel/oteltest v1.0.0-RC1 h1:G685iP3XiskCwk/z0eIabL55XUl2gk0cljhGk9sB0Yk= -go.opentelemetry.io/otel/oteltest v1.0.0-RC1/go.mod h1:+eoIG0gdEOaPNftuy1YScLr1Gb4mL/9lpDkZ0JjMRq4= -go.opentelemetry.io/otel/sdk v1.0.0-RC1 h1:Sy2VLOOg24bipyC29PhuMXYNJrLsxkie8hyI7kUlG9Q= -go.opentelemetry.io/otel/sdk v1.0.0-RC1/go.mod h1:kj6yPn7Pgt5ByRuwesbaWcRLA+V7BSDg3Hf8xRvsvf8= -go.opentelemetry.io/otel/trace v1.0.0-RC1 h1:jrjqKJZEibFrDz+umEASeU3LvdVyWKlnTh7XEfwrT58= -go.opentelemetry.io/otel/trace v1.0.0-RC1/go.mod h1:86UHmyHWFEtWjfWPSbu0+d0Pf9Q6e1U+3ViBOc+NXAg= +go.opentelemetry.io/otel v1.0.0-RC2 h1:SHhxSjB+omnGZPgGlKe+QMp3MyazcOHdQ8qwo89oKbg= +go.opentelemetry.io/otel v1.0.0-RC2/go.mod h1:w1thVQ7qbAy8MHb0IFj8a5Q2QU0l2ksf8u/CN8m3NOM= +go.opentelemetry.io/otel/sdk v1.0.0-RC2 h1:ROuteeSCBaZNjiT9JcFzZepmInDvLktR28Y6qKo8bCs= +go.opentelemetry.io/otel/sdk v1.0.0-RC2/go.mod h1:fgwHyiDn4e5k40TD9VX243rOxXR+jzsWBZYA2P5jpEw= +go.opentelemetry.io/otel/trace v1.0.0-RC2 h1:dunAP0qDULMIT82atj34m5RgvsIK6LcsXf1c/MsYg1w= +go.opentelemetry.io/otel/trace v1.0.0-RC2/go.mod h1:JPQ+z6nNw9mqEGT8o3eoPTdnNI+Aj5JcxEsVGREIAy4= go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -108,6 +106,7 @@ golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210423185535-09eb48e85fd7/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007 h1:gG67DSER+11cZvqIMb8S8bt0vZtiN6xWYARwirrOSfE= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/middleware/tracing/metadata.go b/middleware/tracing/metadata.go new file mode 100644 index 000000000..ec5acb20b --- /dev/null +++ b/middleware/tracing/metadata.go @@ -0,0 +1,43 @@ +package tracing + +import ( + "context" + + "github.com/go-kratos/kratos/v2" + "github.com/go-kratos/kratos/v2/metadata" + "go.opentelemetry.io/otel/propagation" +) + +const serviceHeader = "x-md-service-name" + +// Metadata is tracing metadata propagator +type Metadata struct{} + +var _ propagation.TextMapPropagator = Metadata{} + +// Inject sets metadata key-values from ctx into the carrier. +func (b Metadata) Inject(ctx context.Context, carrier propagation.TextMapCarrier) { + app, _ := kratos.FromContext(ctx) + carrier.Set(serviceHeader, app.Name()) +} + +// Extract returns a copy of parent with the metadata from the carrier added. +func (b Metadata) Extract(parent context.Context, carrier propagation.TextMapCarrier) context.Context { + name := carrier.Get(serviceHeader) + if name != "" { + if md, ok := metadata.FromServerContext(parent); ok { + md.Set(serviceHeader, name) + } else { + md := metadata.New() + md.Set(serviceHeader, name) + parent = metadata.NewServerContext(parent, md) + } + } + + return parent +} + +// Fields returns the keys who's values are set with Inject. +func (b Metadata) Fields() []string { + return []string{serviceHeader} +} diff --git a/middleware/tracing/span.go b/middleware/tracing/span.go new file mode 100644 index 000000000..b8acf3708 --- /dev/null +++ b/middleware/tracing/span.go @@ -0,0 +1,144 @@ +package tracing + +import ( + "context" + "net" + "net/url" + "strings" + + "github.com/go-kratos/kratos/v2/metadata" + "github.com/go-kratos/kratos/v2/transport" + "github.com/go-kratos/kratos/v2/transport/http" + + "go.opentelemetry.io/otel/attribute" + semconv "go.opentelemetry.io/otel/semconv/v1.4.0" + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/peer" + "google.golang.org/protobuf/proto" +) + +func setClientSpan(ctx context.Context, span trace.Span, m interface{}) { + attrs := []attribute.KeyValue{} + var remote string + var operation string + var rpcKind string + if tr, ok := transport.FromClientContext(ctx); ok { + operation = tr.Operation() + rpcKind = tr.Kind().String() + if tr.Kind() == transport.KindHTTP { + if ht, ok := tr.(*http.Transport); ok { + method := ht.Request().Method + route := ht.PathTemplate() + path := ht.Request().URL.Path + attrs = append(attrs, semconv.HTTPMethodKey.String(method)) + attrs = append(attrs, semconv.HTTPRouteKey.String(route)) + attrs = append(attrs, semconv.HTTPTargetKey.String(path)) + remote = ht.Request().Host + } + } else if tr.Kind() == transport.KindGRPC { + remote, _ = parseTarget(tr.Endpoint()) + } + } + attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind)) + _, mAttrs := parseFullMethod(operation) + attrs = append(attrs, mAttrs...) + if remote != "" { + attrs = append(attrs, peerAttr(remote)...) + } + if p, ok := m.(proto.Message); ok { + attrs = append(attrs, attribute.Key("send_msg.size").Int(proto.Size(p))) + } + + span.SetAttributes(attrs...) +} + +func setServerSpan(ctx context.Context, span trace.Span, m interface{}) { + attrs := []attribute.KeyValue{} + var remote string + var operation string + var rpcKind string + if tr, ok := transport.FromServerContext(ctx); ok { + operation = tr.Operation() + rpcKind = tr.Kind().String() + if tr.Kind() == transport.KindHTTP { + if ht, ok := tr.(*http.Transport); ok { + method := ht.Request().Method + route := ht.PathTemplate() + path := ht.Request().URL.Path + attrs = append(attrs, semconv.HTTPMethodKey.String(method)) + attrs = append(attrs, semconv.HTTPRouteKey.String(route)) + attrs = append(attrs, semconv.HTTPTargetKey.String(path)) + remote = ht.Request().RemoteAddr + } + } else if tr.Kind() == transport.KindGRPC { + if p, ok := peer.FromContext(ctx); ok { + remote = p.Addr.String() + } + } + } + attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind)) + _, mAttrs := parseFullMethod(operation) + attrs = append(attrs, mAttrs...) + attrs = append(attrs, peerAttr(remote)...) + if p, ok := m.(proto.Message); ok { + attrs = append(attrs, attribute.Key("recv_msg.size").Int(proto.Size(p))) + } + if md, ok := metadata.FromServerContext(ctx); ok { + attrs = append(attrs, semconv.PeerServiceKey.String(md.Get(serviceHeader))) + } + + span.SetAttributes(attrs...) +} + +// parseFullMethod returns a span name following the OpenTelemetry semantic +// conventions as well as all applicable span attribute.KeyValue attributes based +// on a gRPC's FullMethod. +func parseFullMethod(fullMethod string) (string, []attribute.KeyValue) { + name := strings.TrimLeft(fullMethod, "/") + parts := strings.SplitN(name, "/", 2) + if len(parts) != 2 { + // Invalid format, does not follow `/package.service/method`. + return name, []attribute.KeyValue{attribute.Key("rpc.operation").String(fullMethod)} + } + + var attrs []attribute.KeyValue + if service := parts[0]; service != "" { + attrs = append(attrs, semconv.RPCServiceKey.String(service)) + } + if method := parts[1]; method != "" { + attrs = append(attrs, semconv.RPCMethodKey.String(method)) + } + return name, attrs +} + +// peerAttr returns attributes about the peer address. +func peerAttr(addr string) []attribute.KeyValue { + host, port, err := net.SplitHostPort(addr) + if err != nil { + return []attribute.KeyValue(nil) + } + + if host == "" { + host = "127.0.0.1" + } + + return []attribute.KeyValue{ + semconv.NetPeerIPKey.String(host), + semconv.NetPeerPortKey.String(port), + } +} + +func parseTarget(endpoint string) (address string, err error) { + var u *url.URL + u, err = url.Parse(endpoint) + if err != nil { + if u, err = url.Parse("http://" + endpoint); err != nil { + return "", err + } + return u.Host, nil + } + if len(u.Path) > 1 { + return u.Path[1:], nil + } + return endpoint, nil +} diff --git a/middleware/tracing/statsHandler.go b/middleware/tracing/statsHandler.go new file mode 100644 index 000000000..70c9efdc2 --- /dev/null +++ b/middleware/tracing/statsHandler.go @@ -0,0 +1,39 @@ +package tracing + +import ( + "context" + + "go.opentelemetry.io/otel/trace" + "google.golang.org/grpc/peer" + "google.golang.org/grpc/stats" +) + +// ClientHandler is tracing ClientHandler +type ClientHandler struct { +} + +// HandleConn exists to satisfy gRPC stats.Handler. +func (c *ClientHandler) HandleConn(ctx context.Context, cs stats.ConnStats) { +} + +// TagConn exists to satisfy gRPC stats.Handler. +func (c *ClientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context { + return ctx +} + +// HandleRPC implements per-RPC tracing and stats instrumentation. +func (c *ClientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { + if _, ok := rs.(*stats.OutHeader); ok { + if p, ok := peer.FromContext(ctx); ok { + remoteAddr := p.Addr.String() + if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { + span.SetAttributes(peerAttr(remoteAddr)...) + } + } + } +} + +// TagRPC implements per-RPC context management. +func (c *ClientHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context { + return ctx +} diff --git a/middleware/tracing/tracer.go b/middleware/tracing/tracer.go index e3ce2af42..8bad39878 100644 --- a/middleware/tracing/tracer.go +++ b/middleware/tracing/tracer.go @@ -4,70 +4,77 @@ import ( "context" "fmt" + "github.com/go-kratos/kratos/v2/errors" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" + "google.golang.org/protobuf/proto" ) // Tracer is otel span tracer type Tracer struct { tracer trace.Tracer kind trace.SpanKind + opt *options } // NewTracer create tracer instance func NewTracer(kind trace.SpanKind, opts ...Option) *Tracer { - options := options{} + options := options{ + propagator: propagation.NewCompositeTextMapPropagator(Metadata{}, propagation.Baggage{}, propagation.TraceContext{}), + } for _, o := range opts { o(&options) } - if options.TracerProvider != nil { - otel.SetTracerProvider(options.TracerProvider) - } - if options.Propagator != nil { - otel.SetTextMapPropagator(options.Propagator) - } else { - otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{})) + if options.tracerProvider != nil { + otel.SetTracerProvider(options.tracerProvider) } + switch kind { case trace.SpanKindClient: - return &Tracer{tracer: otel.Tracer("client"), kind: kind} + return &Tracer{tracer: otel.Tracer("kartos"), kind: kind, opt: &options} case trace.SpanKindServer: - return &Tracer{tracer: otel.Tracer("server"), kind: kind} + return &Tracer{tracer: otel.Tracer("kartos"), kind: kind, opt: &options} default: panic(fmt.Sprintf("unsupported span kind: %v", kind)) } } // Start start tracing span -func (t *Tracer) Start(ctx context.Context, component string, operation string, carrier propagation.TextMapCarrier) (context.Context, trace.Span) { +func (t *Tracer) Start(ctx context.Context, operation string, carrier propagation.TextMapCarrier) (context.Context, trace.Span) { if t.kind == trace.SpanKindServer { - ctx = otel.GetTextMapPropagator().Extract(ctx, carrier) + ctx = t.opt.propagator.Extract(ctx, carrier) } ctx, span := t.tracer.Start(ctx, operation, - trace.WithAttributes(attribute.String("component", component)), trace.WithSpanKind(t.kind), ) if t.kind == trace.SpanKindClient { - otel.GetTextMapPropagator().Inject(ctx, carrier) + t.opt.propagator.Inject(ctx, carrier) } return ctx, span } // End finish tracing span -func (t *Tracer) End(ctx context.Context, span trace.Span, err error) { +func (t *Tracer) End(ctx context.Context, span trace.Span, m interface{}, err error) { if err != nil { span.RecordError(err) - span.SetAttributes( - attribute.String("event", "error"), - attribute.String("message", err.Error()), - ) + if e := errors.FromError(err); e != nil { + span.SetAttributes(attribute.Key("rpc.status_code").Int64(int64(e.Code))) + } span.SetStatus(codes.Error, err.Error()) } else { span.SetStatus(codes.Ok, "OK") } + + if p, ok := m.(proto.Message); ok { + if t.kind == trace.SpanKindServer { + span.SetAttributes(attribute.Key("send_msg.size").Int(proto.Size(p))) + } else { + span.SetAttributes(attribute.Key("recv_msg.size").Int(proto.Size(p))) + } + } span.End() } diff --git a/middleware/tracing/tracing.go b/middleware/tracing/tracing.go index 5f968724b..3e03c157c 100644 --- a/middleware/tracing/tracing.go +++ b/middleware/tracing/tracing.go @@ -13,21 +13,22 @@ import ( type Option func(*options) type options struct { - TracerProvider trace.TracerProvider - Propagator propagation.TextMapPropagator + tracerProvider trace.TracerProvider + propagator propagation.TextMapPropagator } // WithPropagator with tracer propagator. func WithPropagator(propagator propagation.TextMapPropagator) Option { return func(opts *options) { - opts.Propagator = propagator + opts.propagator = propagator } } // WithTracerProvider with tracer provider. +// Deprecated: use otel.SetTracerProvider(provider) instead. func WithTracerProvider(provider trace.TracerProvider) Option { return func(opts *options) { - opts.TracerProvider = provider + opts.tracerProvider = provider } } @@ -38,8 +39,9 @@ func Server(opts ...Option) middleware.Middleware { return func(ctx context.Context, req interface{}) (reply interface{}, err error) { if tr, ok := transport.FromServerContext(ctx); ok { var span trace.Span - ctx, span = tracer.Start(ctx, tr.Kind().String(), tr.Operation(), tr.RequestHeader()) - defer func() { tracer.End(ctx, span, err) }() + ctx, span = tracer.Start(ctx, tr.Operation(), tr.RequestHeader()) + setServerSpan(ctx, span, req) + defer func() { tracer.End(ctx, span, reply, err) }() } return handler(ctx, req) } @@ -53,8 +55,9 @@ func Client(opts ...Option) middleware.Middleware { return func(ctx context.Context, req interface{}) (reply interface{}, err error) { if tr, ok := transport.FromClientContext(ctx); ok { var span trace.Span - ctx, span = tracer.Start(ctx, tr.Kind().String(), tr.Operation(), tr.RequestHeader()) - defer func() { tracer.End(ctx, span, err) }() + ctx, span = tracer.Start(ctx, tr.Operation(), tr.RequestHeader()) + setClientSpan(ctx, span, req) + defer func() { tracer.End(ctx, span, reply, err) }() } return handler(ctx, req) } diff --git a/middleware/tracing/tracing_test.go b/middleware/tracing/tracing_test.go index 10d698807..7725c0308 100644 --- a/middleware/tracing/tracing_test.go +++ b/middleware/tracing/tracing_test.go @@ -57,15 +57,15 @@ func TestTracing(t *testing.T) { tracer := NewTracer(trace.SpanKindClient, WithTracerProvider(tp), WithPropagator(propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}))) ts := &Transport{kind: transport.KindHTTP, header: carrier} - ctx, aboveSpan := tracer.Start(transport.NewClientContext(context.Background(), ts), ts.Kind().String(), ts.Operation(), ts.RequestHeader()) - defer tracer.End(ctx, aboveSpan, nil) + ctx, aboveSpan := tracer.Start(transport.NewClientContext(context.Background(), ts), ts.Operation(), ts.RequestHeader()) + defer tracer.End(ctx, aboveSpan, nil, nil) // server use Extract fetch traceInfo from carrier tracer = NewTracer(trace.SpanKindServer, WithPropagator(propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{}))) ts = &Transport{kind: transport.KindHTTP, header: carrier} - ctx, span := tracer.Start(transport.NewServerContext(ctx, ts), ts.Kind().String(), ts.Operation(), ts.RequestHeader()) - defer tracer.End(ctx, span, nil) + ctx, span := tracer.Start(transport.NewServerContext(ctx, ts), ts.Operation(), ts.RequestHeader()) + defer tracer.End(ctx, span, nil, nil) if aboveSpan.SpanContext().TraceID() != span.SpanContext().TraceID() { t.Fatalf("TraceID failed to deliver")