You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
kratos/pkg/net/trace/dapper.go

190 lines
4.2 KiB

6 years ago
package trace
import (
"log"
"os"
"sync"
"time"
)
const (
6 years ago
_maxLevel = 64
// hard code reset probability at 0.00025, 1/4000
6 years ago
_probability = 0.00025
)
6 years ago
// NewTracer new a tracer.
func NewTracer(serviceName string, report reporter, disableSample bool) Tracer {
sampler := newSampler(_probability)
6 years ago
// default internal tags
tags := extendTag()
stdlog := log.New(os.Stderr, "trace", log.LstdFlags)
return &dapper{
6 years ago
serviceName: serviceName,
disableSample: disableSample,
6 years ago
propagators: map[interface{}]propagator{
HTTPFormat: httpPropagator{},
GRPCFormat: grpcPropagator{},
},
reporter: report,
sampler: sampler,
tags: tags,
6 years ago
pool: &sync.Pool{New: func() interface{} { return new(Span) }},
6 years ago
stdlog: stdlog,
}
}
type dapper struct {
6 years ago
serviceName string
disableSample bool
tags []Tag
reporter reporter
propagators map[interface{}]propagator
pool *sync.Pool
stdlog *log.Logger
sampler sampler
6 years ago
}
func (d *dapper) New(operationName string, opts ...Option) Trace {
opt := defaultOption
for _, fn := range opts {
fn(&opt)
}
traceID := genID()
var sampled bool
var probability float32
6 years ago
if d.disableSample {
6 years ago
sampled = true
probability = 1
} else {
sampled, probability = d.sampler.IsSampled(traceID, operationName)
}
6 years ago
pctx := spanContext{TraceID: traceID}
6 years ago
if sampled {
6 years ago
pctx.Flags = flagSampled
pctx.Probability = probability
6 years ago
}
if opt.Debug {
6 years ago
pctx.Flags |= flagDebug
6 years ago
return d.newSpanWithContext(operationName, pctx).SetTag(TagString(TagSpanKind, "server")).SetTag(TagBool("debug", true))
}
// 为了兼容临时为 New 的 Span 设置 span.kind
return d.newSpanWithContext(operationName, pctx).SetTag(TagString(TagSpanKind, "server"))
}
func (d *dapper) newSpanWithContext(operationName string, pctx spanContext) Trace {
sp := d.getSpan()
// is span is not sampled just return a span with this context, no need clear it
//if !pctx.isSampled() {
// sp.context = pctx
// return sp
//}
6 years ago
if pctx.Level > _maxLevel {
6 years ago
// if span reach max limit level return noopspan
return noopspan{}
}
6 years ago
level := pctx.Level + 1
6 years ago
nctx := spanContext{
6 years ago
TraceID: pctx.TraceID,
ParentID: pctx.SpanID,
Flags: pctx.Flags,
Level: level,
6 years ago
}
6 years ago
if pctx.SpanID == 0 {
nctx.SpanID = pctx.TraceID
6 years ago
} else {
6 years ago
nctx.SpanID = genID()
6 years ago
}
sp.operationName = operationName
sp.context = nctx
sp.startTime = time.Now()
sp.tags = append(sp.tags, d.tags...)
return sp
}
func (d *dapper) Inject(t Trace, format interface{}, carrier interface{}) error {
// if carrier implement Carrier use direct, ignore format
carr, ok := carrier.(Carrier)
if ok {
t.Visit(carr.Set)
return nil
}
// use Built-in propagators
pp, ok := d.propagators[format]
if !ok {
return ErrUnsupportedFormat
}
carr, err := pp.Inject(carrier)
if err != nil {
return err
}
if t != nil {
t.Visit(carr.Set)
}
return nil
}
func (d *dapper) Extract(format interface{}, carrier interface{}) (Trace, error) {
sp, err := d.extract(format, carrier)
if err != nil {
return sp, err
}
// 为了兼容临时为 New 的 Span 设置 span.kind
return sp.SetTag(TagString(TagSpanKind, "server")), nil
}
func (d *dapper) extract(format interface{}, carrier interface{}) (Trace, error) {
// if carrier implement Carrier use direct, ignore format
carr, ok := carrier.(Carrier)
if !ok {
// use Built-in propagators
pp, ok := d.propagators[format]
if !ok {
return nil, ErrUnsupportedFormat
}
var err error
if carr, err = pp.Extract(carrier); err != nil {
return nil, err
}
}
pctx, err := contextFromString(carr.Get(KratosTraceID))
if err != nil {
return nil, err
}
// NOTE: call SetTitle after extract trace
return d.newSpanWithContext("", pctx), nil
}
func (d *dapper) Close() error {
return d.reporter.Close()
}
6 years ago
func (d *dapper) report(sp *Span) {
6 years ago
if sp.context.isSampled() {
if err := d.reporter.WriteSpan(sp); err != nil {
d.stdlog.Printf("marshal trace span error: %s", err)
}
}
d.putSpan(sp)
}
6 years ago
func (d *dapper) putSpan(sp *Span) {
6 years ago
if len(sp.tags) > 32 {
sp.tags = nil
}
if len(sp.logs) > 32 {
sp.logs = nil
}
d.pool.Put(sp)
}
6 years ago
func (d *dapper) getSpan() *Span {
sp := d.pool.Get().(*Span)
6 years ago
sp.dapper = d
sp.childs = 0
sp.tags = sp.tags[:0]
sp.logs = sp.logs[:0]
return sp
}