From 8f8b861f7d26c6749beb0070b63ddfe57d63b998 Mon Sep 17 00:00:00 2001 From: longxboy Date: Mon, 17 May 2021 17:23:41 +0800 Subject: [PATCH] add default propagation for trace (#919) * add default propagation for trace * add trace carrier --- middleware/tracing/carrier.go | 41 ++++++++++++++ middleware/tracing/tracer.go | 75 +++++++++++++++++++++++++ middleware/tracing/tracing.go | 103 ++++++---------------------------- 3 files changed, 134 insertions(+), 85 deletions(-) create mode 100644 middleware/tracing/carrier.go create mode 100644 middleware/tracing/tracer.go diff --git a/middleware/tracing/carrier.go b/middleware/tracing/carrier.go new file mode 100644 index 000000000..ebc58f0cc --- /dev/null +++ b/middleware/tracing/carrier.go @@ -0,0 +1,41 @@ +package tracing + +import ( + "google.golang.org/grpc/metadata" +) + +// MetadataCarrier is grpc metadata carrier +type MetadataCarrier metadata.MD + +// Get returns the value associated with the passed key. +func (mc MetadataCarrier) Get(key string) string { + values := metadata.MD(mc).Get(key) + if len(values) == 0 { + return "" + } + return values[0] +} + +// Set stores the key-value pair. +func (mc MetadataCarrier) Set(key string, value string) { + metadata.MD(mc).Set(key, value) +} + +// Keys lists the keys stored in this carrier. +func (mc MetadataCarrier) Keys() []string { + keys := make([]string, 0, metadata.MD(mc).Len()) + for key := range metadata.MD(mc) { + keys = append(keys, key) + } + return keys +} + +// Del delete key +func (mc MetadataCarrier) Del(key string) { + delete(mc, key) +} + +// Clone copy MetadataCarrier +func (mc MetadataCarrier) Clone() MetadataCarrier { + return MetadataCarrier(metadata.MD(mc).Copy()) +} diff --git a/middleware/tracing/tracer.go b/middleware/tracing/tracer.go new file mode 100644 index 000000000..c7d695b62 --- /dev/null +++ b/middleware/tracing/tracer.go @@ -0,0 +1,75 @@ +package tracing + +import ( + "context" + "fmt" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/propagation" + "go.opentelemetry.io/otel/trace" +) + +// Tracer is otel span tracer +type Tracer struct { + tracer trace.Tracer + kind trace.SpanKind +} + +// NewTracer create tracer instance +func NewTracer(kind trace.SpanKind, opts ...Option) *Tracer { + options := options{} + for _, o := range opts { + o(&options) + } + if options.TracerProvider != nil { + otel.SetTracerProvider(options.TracerProvider) + } + if options.Propagators != nil { + otel.SetTextMapPropagator(options.Propagators) + } else { + otel.SetTextMapPropagator(propagation.NewCompositeTextMapPropagator(propagation.Baggage{}, propagation.TraceContext{})) + } + var name string + if kind == trace.SpanKindServer { + name = "server" + } else if kind == trace.SpanKindClient { + name = "client" + } else { + panic(fmt.Sprintf("unsupported span kind: %v", kind)) + } + tracer := otel.Tracer(name) + return &Tracer{tracer: tracer, kind: kind} +} + +// Start start tracing span +func (t *Tracer) Start(ctx context.Context, component string, operation string, carrier propagation.TextMapCarrier) (context.Context, trace.Span) { + if t.kind == trace.SpanKindServer { + ctx = otel.GetTextMapPropagator().Extract(ctx, carrier) + } + ctx, span := t.tracer.Start(ctx, + operation, + trace.WithAttributes(attribute.String("component", component)), + trace.WithSpanKind(t.kind), + ) + if t.kind == trace.SpanKindClient { + otel.GetTextMapPropagator().Inject(ctx, carrier) + } + return ctx, span +} + +// End finish tracing span +func (t *Tracer) End(ctx context.Context, span trace.Span, err error) { + if err != nil { + span.RecordError(err) + span.SetAttributes( + attribute.String("event", "error"), + attribute.String("message", err.Error()), + ) + span.SetStatus(codes.Error, err.Error()) + } else { + span.SetStatus(codes.Ok, "OK") + } + span.End() +} diff --git a/middleware/tracing/tracing.go b/middleware/tracing/tracing.go index bdfaaf6e7..576b5d3fa 100644 --- a/middleware/tracing/tracing.go +++ b/middleware/tracing/tracing.go @@ -7,8 +7,6 @@ import ( "github.com/go-kratos/kratos/v2/transport/grpc" "github.com/go-kratos/kratos/v2/transport/http" "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/codes" "go.opentelemetry.io/otel/propagation" "go.opentelemetry.io/otel/trace" "google.golang.org/grpc/metadata" @@ -34,80 +32,36 @@ func WithTracerProvider(provider trace.TracerProvider) Option { } } -type MetadataCarrier struct { - md *metadata.MD -} - -var _ propagation.TextMapCarrier = &MetadataCarrier{} - -func (mc MetadataCarrier) Get(key string) string { - values := mc.md.Get(key) - if len(values) == 0 { - return "" - } - return values[0] -} - -func (mc MetadataCarrier) Set(key string, value string) { - mc.md.Set(key, value) -} - -func (mc MetadataCarrier) Keys() []string { - keys := make([]string, 0, mc.md.Len()) - for key := range *mc.md { - keys = append(keys, key) - } - return keys -} - // Server returns a new server middleware for OpenTelemetry. func Server(opts ...Option) middleware.Middleware { - options := options{} - for _, o := range opts { - o(&options) - } - if options.TracerProvider != nil { - otel.SetTracerProvider(options.TracerProvider) - } - if options.Propagators != nil { - otel.SetTextMapPropagator(options.Propagators) - } - tracer := otel.Tracer("server") + tracer := NewTracer(trace.SpanKindServer, opts...) + return func(handler middleware.Handler) middleware.Handler { return func(ctx context.Context, req interface{}) (reply interface{}, err error) { var ( component string operation string + carrier propagation.TextMapCarrier ) if info, ok := http.FromServerContext(ctx); ok { // HTTP span component = "HTTP" operation = info.Request.RequestURI + carrier = propagation.HeaderCarrier(info.Request.Header) ctx = otel.GetTextMapPropagator().Extract(ctx, propagation.HeaderCarrier(info.Request.Header)) } else if info, ok := grpc.FromServerContext(ctx); ok { // gRPC span component = "gRPC" operation = info.FullMethod if md, ok := metadata.FromIncomingContext(ctx); ok { - ctx = otel.GetTextMapPropagator().Extract(ctx, MetadataCarrier{md: &md}) + carrier = MetadataCarrier(md) } } - ctx, span := tracer.Start(ctx, - operation, - trace.WithAttributes(attribute.String("component", component)), - trace.WithSpanKind(trace.SpanKindServer), - ) - defer span.End() - if reply, err = handler(ctx, req); err != nil { - span.RecordError(err) - span.SetAttributes( - attribute.String("event", "error"), - attribute.String("message", err.Error()), - ) - span.SetStatus(codes.Error, err.Error()) - } else { - span.SetStatus(codes.Ok, "OK") - } + ctx, span := tracer.Start(ctx, component, operation, carrier) + defer tracer.End(ctx, span, err) + + reply, err = handler(ctx, req) + return } } @@ -115,17 +69,8 @@ func Server(opts ...Option) middleware.Middleware { // Client returns a new client middleware for OpenTelemetry. func Client(opts ...Option) middleware.Middleware { - options := options{} - for _, o := range opts { - o(&options) - } - if options.TracerProvider != nil { - otel.SetTracerProvider(options.TracerProvider) - } - if options.Propagators != nil { - otel.SetTextMapPropagator(options.Propagators) - } - tracer := otel.Tracer("client") + tracer := NewTracer(trace.SpanKindClient, opts...) + return func(handler middleware.Handler) middleware.Handler { return func(ctx context.Context, req interface{}) (reply interface{}, err error) { var ( @@ -146,26 +91,14 @@ func Client(opts ...Option) middleware.Middleware { if !ok { md = metadata.Pairs() } - carrier = MetadataCarrier{md: &md} + carrier = MetadataCarrier(md) ctx = metadata.NewOutgoingContext(ctx, md) } - ctx, span := tracer.Start(ctx, - operation, - trace.WithAttributes(attribute.String("component", component)), - trace.WithSpanKind(trace.SpanKindClient), - ) - defer span.End() - otel.GetTextMapPropagator().Inject(ctx, carrier) - if reply, err = handler(ctx, req); err != nil { - span.RecordError(err) - span.SetAttributes( - attribute.String("event", "error"), - attribute.String("message", err.Error()), - ) - span.SetStatus(codes.Error, err.Error()) - } else { - span.SetStatus(codes.Ok, "OK") - } + ctx, span := tracer.Start(ctx, component, operation, carrier) + defer tracer.End(ctx, span, err) + + reply, err = handler(ctx, req) + return } }