parent
2fa9168bde
commit
f65a0a9134
@ -0,0 +1,43 @@ |
||||
package tracing |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"github.com/go-kratos/kratos/v2" |
||||
"github.com/go-kratos/kratos/v2/metadata" |
||||
"go.opentelemetry.io/otel/propagation" |
||||
) |
||||
|
||||
const serviceHeader = "x-md-service-name" |
||||
|
||||
// Metadata is tracing metadata propagator
|
||||
type Metadata struct{} |
||||
|
||||
var _ propagation.TextMapPropagator = Metadata{} |
||||
|
||||
// Inject sets metadata key-values from ctx into the carrier.
|
||||
func (b Metadata) Inject(ctx context.Context, carrier propagation.TextMapCarrier) { |
||||
app, _ := kratos.FromContext(ctx) |
||||
carrier.Set(serviceHeader, app.Name()) |
||||
} |
||||
|
||||
// Extract returns a copy of parent with the metadata from the carrier added.
|
||||
func (b Metadata) Extract(parent context.Context, carrier propagation.TextMapCarrier) context.Context { |
||||
name := carrier.Get(serviceHeader) |
||||
if name != "" { |
||||
if md, ok := metadata.FromServerContext(parent); ok { |
||||
md.Set(serviceHeader, name) |
||||
} else { |
||||
md := metadata.New() |
||||
md.Set(serviceHeader, name) |
||||
parent = metadata.NewServerContext(parent, md) |
||||
} |
||||
} |
||||
|
||||
return parent |
||||
} |
||||
|
||||
// Fields returns the keys who's values are set with Inject.
|
||||
func (b Metadata) Fields() []string { |
||||
return []string{serviceHeader} |
||||
} |
@ -0,0 +1,144 @@ |
||||
package tracing |
||||
|
||||
import ( |
||||
"context" |
||||
"net" |
||||
"net/url" |
||||
"strings" |
||||
|
||||
"github.com/go-kratos/kratos/v2/metadata" |
||||
"github.com/go-kratos/kratos/v2/transport" |
||||
"github.com/go-kratos/kratos/v2/transport/http" |
||||
|
||||
"go.opentelemetry.io/otel/attribute" |
||||
semconv "go.opentelemetry.io/otel/semconv/v1.4.0" |
||||
"go.opentelemetry.io/otel/trace" |
||||
"google.golang.org/grpc/peer" |
||||
"google.golang.org/protobuf/proto" |
||||
) |
||||
|
||||
func setClientSpan(ctx context.Context, span trace.Span, m interface{}) { |
||||
attrs := []attribute.KeyValue{} |
||||
var remote string |
||||
var operation string |
||||
var rpcKind string |
||||
if tr, ok := transport.FromClientContext(ctx); ok { |
||||
operation = tr.Operation() |
||||
rpcKind = tr.Kind().String() |
||||
if tr.Kind() == transport.KindHTTP { |
||||
if ht, ok := tr.(*http.Transport); ok { |
||||
method := ht.Request().Method |
||||
route := ht.PathTemplate() |
||||
path := ht.Request().URL.Path |
||||
attrs = append(attrs, semconv.HTTPMethodKey.String(method)) |
||||
attrs = append(attrs, semconv.HTTPRouteKey.String(route)) |
||||
attrs = append(attrs, semconv.HTTPTargetKey.String(path)) |
||||
remote = ht.Request().Host |
||||
} |
||||
} else if tr.Kind() == transport.KindGRPC { |
||||
remote, _ = parseTarget(tr.Endpoint()) |
||||
} |
||||
} |
||||
attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind)) |
||||
_, mAttrs := parseFullMethod(operation) |
||||
attrs = append(attrs, mAttrs...) |
||||
if remote != "" { |
||||
attrs = append(attrs, peerAttr(remote)...) |
||||
} |
||||
if p, ok := m.(proto.Message); ok { |
||||
attrs = append(attrs, attribute.Key("send_msg.size").Int(proto.Size(p))) |
||||
} |
||||
|
||||
span.SetAttributes(attrs...) |
||||
} |
||||
|
||||
func setServerSpan(ctx context.Context, span trace.Span, m interface{}) { |
||||
attrs := []attribute.KeyValue{} |
||||
var remote string |
||||
var operation string |
||||
var rpcKind string |
||||
if tr, ok := transport.FromServerContext(ctx); ok { |
||||
operation = tr.Operation() |
||||
rpcKind = tr.Kind().String() |
||||
if tr.Kind() == transport.KindHTTP { |
||||
if ht, ok := tr.(*http.Transport); ok { |
||||
method := ht.Request().Method |
||||
route := ht.PathTemplate() |
||||
path := ht.Request().URL.Path |
||||
attrs = append(attrs, semconv.HTTPMethodKey.String(method)) |
||||
attrs = append(attrs, semconv.HTTPRouteKey.String(route)) |
||||
attrs = append(attrs, semconv.HTTPTargetKey.String(path)) |
||||
remote = ht.Request().RemoteAddr |
||||
} |
||||
} else if tr.Kind() == transport.KindGRPC { |
||||
if p, ok := peer.FromContext(ctx); ok { |
||||
remote = p.Addr.String() |
||||
} |
||||
} |
||||
} |
||||
attrs = append(attrs, semconv.RPCSystemKey.String(rpcKind)) |
||||
_, mAttrs := parseFullMethod(operation) |
||||
attrs = append(attrs, mAttrs...) |
||||
attrs = append(attrs, peerAttr(remote)...) |
||||
if p, ok := m.(proto.Message); ok { |
||||
attrs = append(attrs, attribute.Key("recv_msg.size").Int(proto.Size(p))) |
||||
} |
||||
if md, ok := metadata.FromServerContext(ctx); ok { |
||||
attrs = append(attrs, semconv.PeerServiceKey.String(md.Get(serviceHeader))) |
||||
} |
||||
|
||||
span.SetAttributes(attrs...) |
||||
} |
||||
|
||||
// parseFullMethod returns a span name following the OpenTelemetry semantic
|
||||
// conventions as well as all applicable span attribute.KeyValue attributes based
|
||||
// on a gRPC's FullMethod.
|
||||
func parseFullMethod(fullMethod string) (string, []attribute.KeyValue) { |
||||
name := strings.TrimLeft(fullMethod, "/") |
||||
parts := strings.SplitN(name, "/", 2) |
||||
if len(parts) != 2 { |
||||
// Invalid format, does not follow `/package.service/method`.
|
||||
return name, []attribute.KeyValue{attribute.Key("rpc.operation").String(fullMethod)} |
||||
} |
||||
|
||||
var attrs []attribute.KeyValue |
||||
if service := parts[0]; service != "" { |
||||
attrs = append(attrs, semconv.RPCServiceKey.String(service)) |
||||
} |
||||
if method := parts[1]; method != "" { |
||||
attrs = append(attrs, semconv.RPCMethodKey.String(method)) |
||||
} |
||||
return name, attrs |
||||
} |
||||
|
||||
// peerAttr returns attributes about the peer address.
|
||||
func peerAttr(addr string) []attribute.KeyValue { |
||||
host, port, err := net.SplitHostPort(addr) |
||||
if err != nil { |
||||
return []attribute.KeyValue(nil) |
||||
} |
||||
|
||||
if host == "" { |
||||
host = "127.0.0.1" |
||||
} |
||||
|
||||
return []attribute.KeyValue{ |
||||
semconv.NetPeerIPKey.String(host), |
||||
semconv.NetPeerPortKey.String(port), |
||||
} |
||||
} |
||||
|
||||
func parseTarget(endpoint string) (address string, err error) { |
||||
var u *url.URL |
||||
u, err = url.Parse(endpoint) |
||||
if err != nil { |
||||
if u, err = url.Parse("http://" + endpoint); err != nil { |
||||
return "", err |
||||
} |
||||
return u.Host, nil |
||||
} |
||||
if len(u.Path) > 1 { |
||||
return u.Path[1:], nil |
||||
} |
||||
return endpoint, nil |
||||
} |
@ -0,0 +1,39 @@ |
||||
package tracing |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"go.opentelemetry.io/otel/trace" |
||||
"google.golang.org/grpc/peer" |
||||
"google.golang.org/grpc/stats" |
||||
) |
||||
|
||||
// ClientHandler is tracing ClientHandler
|
||||
type ClientHandler struct { |
||||
} |
||||
|
||||
// HandleConn exists to satisfy gRPC stats.Handler.
|
||||
func (c *ClientHandler) HandleConn(ctx context.Context, cs stats.ConnStats) { |
||||
} |
||||
|
||||
// TagConn exists to satisfy gRPC stats.Handler.
|
||||
func (c *ClientHandler) TagConn(ctx context.Context, cti *stats.ConnTagInfo) context.Context { |
||||
return ctx |
||||
} |
||||
|
||||
// HandleRPC implements per-RPC tracing and stats instrumentation.
|
||||
func (c *ClientHandler) HandleRPC(ctx context.Context, rs stats.RPCStats) { |
||||
if _, ok := rs.(*stats.OutHeader); ok { |
||||
if p, ok := peer.FromContext(ctx); ok { |
||||
remoteAddr := p.Addr.String() |
||||
if span := trace.SpanFromContext(ctx); span.SpanContext().IsValid() { |
||||
span.SetAttributes(peerAttr(remoteAddr)...) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TagRPC implements per-RPC context management.
|
||||
func (c *ClientHandler) TagRPC(ctx context.Context, rti *stats.RPCTagInfo) context.Context { |
||||
return ctx |
||||
} |
Loading…
Reference in new issue