fix zipkin kind

pull/209/head
Tony 5 years ago
parent f6629f235d
commit ce5990b96f
  1. 46
      pkg/net/trace/context.go
  2. 26
      pkg/net/trace/dapper.go
  3. 8
      pkg/net/trace/marshal.go
  4. 16
      pkg/net/trace/span.go
  5. 21
      pkg/net/trace/zipkin/zipkin.go
  6. 25
      pkg/net/trace/zipkin/zipkin_test.go

@ -19,39 +19,39 @@ var (
// SpanContext implements opentracing.SpanContext // SpanContext implements opentracing.SpanContext
type spanContext struct { type spanContext struct {
// traceID represents globally unique ID of the trace. // TraceID represents globally unique ID of the trace.
// Usually generated as a random number. // Usually generated as a random number.
traceID uint64 TraceID uint64
// spanID represents span ID that must be unique within its trace, // SpanID represents span ID that must be unique within its trace,
// but does not have to be globally unique. // but does not have to be globally unique.
spanID uint64 SpanID uint64
// parentID refers to the ID of the parent span. // ParentID refers to the ID of the parent span.
// Should be 0 if the current span is a root span. // Should be 0 if the current span is a root span.
parentID uint64 ParentID uint64
// flags is a bitmap containing such bits as 'sampled' and 'debug'. // Flags is a bitmap containing such bits as 'sampled' and 'debug'.
flags byte Flags byte
// probability // Probability
probability float32 Probability float32
// current level // Level current level
level int Level int
} }
func (c spanContext) isSampled() bool { func (c spanContext) isSampled() bool {
return (c.flags & flagSampled) == flagSampled return (c.Flags & flagSampled) == flagSampled
} }
func (c spanContext) isDebug() bool { func (c spanContext) isDebug() bool {
return (c.flags & flagDebug) == flagDebug return (c.Flags & flagDebug) == flagDebug
} }
// IsValid check spanContext valid // IsValid check spanContext valid
func (c spanContext) IsValid() bool { func (c spanContext) IsValid() bool {
return c.traceID != 0 && c.spanID != 0 return c.TraceID != 0 && c.SpanID != 0
} }
// emptyContext emptyContext // emptyContext emptyContext
@ -69,10 +69,10 @@ var emptyContext = spanContext{}
// sample-rate: s-{base16(BigEndian(float32))} // sample-rate: s-{base16(BigEndian(float32))}
func (c spanContext) String() string { func (c spanContext) String() string {
base := make([]string, 4) base := make([]string, 4)
base[0] = strconv.FormatUint(uint64(c.traceID), 16) base[0] = strconv.FormatUint(uint64(c.TraceID), 16)
base[1] = strconv.FormatUint(uint64(c.spanID), 16) base[1] = strconv.FormatUint(uint64(c.SpanID), 16)
base[2] = strconv.FormatUint(uint64(c.parentID), 16) base[2] = strconv.FormatUint(uint64(c.ParentID), 16)
base[3] = strconv.FormatUint(uint64(c.flags), 16) base[3] = strconv.FormatUint(uint64(c.Flags), 16)
return strings.Join(base, ":") return strings.Join(base, ":")
} }
@ -101,10 +101,10 @@ func contextFromString(value string) (spanContext, error) {
return emptyContext, errInvalidTracerString return emptyContext, errInvalidTracerString
} }
sctx := spanContext{ sctx := spanContext{
traceID: rets[0], TraceID: rets[0],
spanID: rets[1], SpanID: rets[1],
parentID: rets[2], ParentID: rets[2],
flags: byte(rets[3]), Flags: byte(rets[3]),
} }
return sctx, nil return sctx, nil
} }

@ -60,13 +60,13 @@ func (d *dapper) New(operationName string, opts ...Option) Trace {
} else { } else {
sampled, probability = d.sampler.IsSampled(traceID, operationName) sampled, probability = d.sampler.IsSampled(traceID, operationName)
} }
pctx := spanContext{traceID: traceID} pctx := spanContext{TraceID: traceID}
if sampled { if sampled {
pctx.flags = flagSampled pctx.Flags = flagSampled
pctx.probability = probability pctx.Probability = probability
} }
if opt.Debug { if opt.Debug {
pctx.flags |= flagDebug pctx.Flags |= flagDebug
return d.newSpanWithContext(operationName, pctx).SetTag(TagString(TagSpanKind, "server")).SetTag(TagBool("debug", true)) return d.newSpanWithContext(operationName, pctx).SetTag(TagString(TagSpanKind, "server")).SetTag(TagBool("debug", true))
} }
// 为了兼容临时为 New 的 Span 设置 span.kind // 为了兼容临时为 New 的 Span 设置 span.kind
@ -80,21 +80,21 @@ func (d *dapper) newSpanWithContext(operationName string, pctx spanContext) Trac
// sp.context = pctx // sp.context = pctx
// return sp // return sp
//} //}
if pctx.level > _maxLevel { if pctx.Level > _maxLevel {
// if span reach max limit level return noopspan // if span reach max limit level return noopspan
return noopspan{} return noopspan{}
} }
level := pctx.level + 1 level := pctx.Level + 1
nctx := spanContext{ nctx := spanContext{
traceID: pctx.traceID, TraceID: pctx.TraceID,
parentID: pctx.spanID, ParentID: pctx.SpanID,
flags: pctx.flags, Flags: pctx.Flags,
level: level, Level: level,
} }
if pctx.spanID == 0 { if pctx.SpanID == 0 {
nctx.spanID = pctx.traceID nctx.SpanID = pctx.TraceID
} else { } else {
nctx.spanID = genID() nctx.SpanID = genID()
} }
sp.operationName = operationName sp.operationName = operationName
sp.context = nctx sp.context = nctx

@ -32,10 +32,10 @@ func marshalSpanV1(sp *Span) ([]byte, error) {
protoSpan.Version = protoVersion1 protoSpan.Version = protoVersion1
protoSpan.ServiceName = sp.dapper.serviceName protoSpan.ServiceName = sp.dapper.serviceName
protoSpan.OperationName = sp.operationName protoSpan.OperationName = sp.operationName
protoSpan.TraceId = sp.context.traceID protoSpan.TraceId = sp.context.TraceID
protoSpan.SpanId = sp.context.spanID protoSpan.SpanId = sp.context.SpanID
protoSpan.ParentId = sp.context.parentID protoSpan.ParentId = sp.context.ParentID
protoSpan.SamplingProbability = sp.context.probability protoSpan.SamplingProbability = sp.context.Probability
protoSpan.StartTime = &timestamp.Timestamp{ protoSpan.StartTime = &timestamp.Timestamp{
Seconds: sp.startTime.Unix(), Seconds: sp.startTime.Unix(),
Nanos: int32(sp.startTime.Nanosecond()), Nanos: int32(sp.startTime.Nanosecond()),

@ -43,22 +43,18 @@ func (s *Span) TraceID() string {
return s.context.String() return s.context.String()
} }
func (s *Span) Tid() uint64 { func (s *Span) Context() spanContext {
return s.context.traceID return s.context
}
func (s *Span) SpanID() uint64 {
return s.context.spanID
}
func (s *Span) ParentID() uint64 {
return s.context.parentID
} }
func (s *Span) Tags() []Tag { func (s *Span) Tags() []Tag {
return s.tags return s.tags
} }
func (s *Span) Logs() []*protogen.Log {
return s.logs
}
func (s *Span) Fork(serviceName, operationName string) Trace { func (s *Span) Fork(serviceName, operationName string) Trace {
if s.childs > _maxChilds { if s.childs > _maxChilds {
// if child span more than max childs set return noopspan // if child span more than max childs set return noopspan

@ -25,9 +25,10 @@ func newReport(c *Config) *report {
// WriteSpan write a trace span to queue. // WriteSpan write a trace span to queue.
func (r *report) WriteSpan(raw *trace.Span) (err error) { func (r *report) WriteSpan(raw *trace.Span) (err error) {
traceID := model.TraceID{Low: raw.Tid()} ctx := raw.Context()
spanID := model.ID(raw.SpanID()) traceID := model.TraceID{Low: ctx.TraceID}
parentID := model.ID(raw.ParentID()) spanID := model.ID(ctx.SpanID)
parentID := model.ID(ctx.ParentID)
span := model.SpanModel{ span := model.SpanModel{
SpanContext: model.SpanContext{ SpanContext: model.SpanContext{
TraceID: traceID, TraceID: traceID,
@ -42,7 +43,16 @@ func (r *report) WriteSpan(raw *trace.Span) (err error) {
for _, tag := range raw.Tags() { for _, tag := range raw.Tags() {
switch tag.Key { switch tag.Key {
case trace.TagSpanKind: case trace.TagSpanKind:
span.Kind = model.Kind(tag.Value.(string)) switch tag.Value.(string) {
case "client":
span.Kind = model.Client
case "server":
span.Kind = model.Server
case "producer":
span.Kind = model.Producer
case "consumer":
span.Kind = model.Consumer
}
case trace.TagPeerService: case trace.TagPeerService:
span.LocalEndpoint = &model.Endpoint{ServiceName: tag.Value.(string)} span.LocalEndpoint = &model.Endpoint{ServiceName: tag.Value.(string)}
default: default:
@ -54,6 +64,9 @@ func (r *report) WriteSpan(raw *trace.Span) (err error) {
} }
} }
} }
for _, lg := range raw.Logs() {
span.Tags[lg.Key] = string(lg.Value)
}
r.rpt.Send(span) r.rpt.Send(span)
return return
} }

@ -1,7 +1,9 @@
package zipkin package zipkin
import ( import (
"io/ioutil"
"net/http" "net/http"
"net/http/httptest"
"testing" "testing"
"time" "time"
@ -10,18 +12,35 @@ import (
) )
func TestZipkin(t *testing.T) { func TestZipkin(t *testing.T) {
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.Method != "POST" {
t.Errorf("expected 'POST' request, got '%s'", r.Method)
}
aSpanPayload, err := ioutil.ReadAll(r.Body)
if err != nil {
t.Errorf("unexpected error: %s", err.Error())
}
t.Logf("%s\n", aSpanPayload)
}))
defer ts.Close()
c := &Config{ c := &Config{
Endpoint: "http://127.0.0.1:9411/api/v2/spans", Endpoint: ts.URL,
Timeout: xtime.Duration(time.Second * 5), Timeout: xtime.Duration(time.Second * 5),
BatchSize: 100, BatchSize: 100,
} }
//c.Endpoint = "http://127.0.0.1:9411/api/v2/spans"
report := newReport(c) report := newReport(c)
t1 := trace.NewTracer("service1", report, true) t1 := trace.NewTracer("service1", report, true)
t2 := trace.NewTracer("service2", report, true) t2 := trace.NewTracer("service2", report, true)
sp1 := t1.New("opt_1") sp1 := t1.New("option_1")
sp2 := sp1.Fork("", "opt_client") sp2 := sp1.Fork("service3", "opt_client")
// inject
header := make(http.Header) header := make(http.Header)
t1.Inject(sp2, trace.HTTPFormat, header) t1.Inject(sp2, trace.HTTPFormat, header)
t.Log(header)
sp3, err := t2.Extract(trace.HTTPFormat, header) sp3, err := t2.Extract(trace.HTTPFormat, header)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)

Loading…
Cancel
Save