fix import pkg (#651)
parent
25db576747
commit
f831dc0b55
@ -0,0 +1,11 @@ |
||||
package jaeger |
||||
|
||||
import ( |
||||
"github.com/go-kratos/kratos/pkg/conf/env" |
||||
"github.com/go-kratos/kratos/pkg/net/trace" |
||||
) |
||||
|
||||
func Init() { |
||||
c := &Config{Endpoint: "http://127.0.0.1:9191", BatchSize: 120} |
||||
trace.SetGlobalTracer(trace.NewTracer(env.AppID, newReport(c), true)) |
||||
} |
@ -0,0 +1,314 @@ |
||||
package jaeger |
||||
|
||||
import ( |
||||
"bytes" |
||||
"fmt" |
||||
"io" |
||||
"io/ioutil" |
||||
"net/http" |
||||
"time" |
||||
|
||||
"github.com/opentracing/opentracing-go" |
||||
ja "github.com/uber/jaeger-client-go" |
||||
"github.com/uber/jaeger-client-go/thrift" |
||||
j "github.com/uber/jaeger-client-go/thrift-gen/jaeger" |
||||
) |
||||
|
||||
// Default timeout for http request in seconds
|
||||
const defaultHTTPTimeout = time.Second * 5 |
||||
|
||||
// HTTPTransport implements Transport by forwarding spans to a http server.
|
||||
type HTTPTransport struct { |
||||
url string |
||||
client *http.Client |
||||
batchSize int |
||||
spans []*j.Span |
||||
process *j.Process |
||||
httpCredentials *HTTPBasicAuthCredentials |
||||
headers map[string]string |
||||
} |
||||
|
||||
// HTTPBasicAuthCredentials stores credentials for HTTP basic auth.
|
||||
type HTTPBasicAuthCredentials struct { |
||||
username string |
||||
password string |
||||
} |
||||
|
||||
// HTTPOption sets a parameter for the HttpCollector
|
||||
type HTTPOption func(c *HTTPTransport) |
||||
|
||||
// HTTPTimeout sets maximum timeout for http request.
|
||||
func HTTPTimeout(duration time.Duration) HTTPOption { |
||||
return func(c *HTTPTransport) { c.client.Timeout = duration } |
||||
} |
||||
|
||||
// HTTPBatchSize sets the maximum batch size, after which a collect will be
|
||||
// triggered. The default batch size is 100 spans.
|
||||
func HTTPBatchSize(n int) HTTPOption { |
||||
return func(c *HTTPTransport) { c.batchSize = n } |
||||
} |
||||
|
||||
// HTTPBasicAuth sets the credentials required to perform HTTP basic auth
|
||||
func HTTPBasicAuth(username string, password string) HTTPOption { |
||||
return func(c *HTTPTransport) { |
||||
c.httpCredentials = &HTTPBasicAuthCredentials{username: username, password: password} |
||||
} |
||||
} |
||||
|
||||
// HTTPRoundTripper configures the underlying Transport on the *http.Client
|
||||
// that is used
|
||||
func HTTPRoundTripper(transport http.RoundTripper) HTTPOption { |
||||
return func(c *HTTPTransport) { |
||||
c.client.Transport = transport |
||||
} |
||||
} |
||||
|
||||
// HTTPHeaders defines the HTTP headers that will be attached to the jaeger client's HTTP request
|
||||
func HTTPHeaders(headers map[string]string) HTTPOption { |
||||
return func(c *HTTPTransport) { |
||||
c.headers = headers |
||||
} |
||||
} |
||||
|
||||
// NewHTTPTransport returns a new HTTP-backend transport. url should be an http
|
||||
// url of the collector to handle POST request, typically something like:
|
||||
// http://hostname:14268/api/traces?format=jaeger.thrift
|
||||
func NewHTTPTransport(url string, options ...HTTPOption) *HTTPTransport { |
||||
c := &HTTPTransport{ |
||||
url: url, |
||||
client: &http.Client{Timeout: defaultHTTPTimeout}, |
||||
batchSize: 100, |
||||
spans: []*j.Span{}, |
||||
} |
||||
|
||||
for _, option := range options { |
||||
option(c) |
||||
} |
||||
return c |
||||
} |
||||
|
||||
// Append implements Transport.
|
||||
func (c *HTTPTransport) Append(span *Span) (int, error) { |
||||
if c.process == nil { |
||||
process := j.NewProcess() |
||||
process.ServiceName = span.ServiceName() |
||||
c.process = process |
||||
} |
||||
jSpan := BuildJaegerThrift(span) |
||||
c.spans = append(c.spans, jSpan) |
||||
if len(c.spans) >= c.batchSize { |
||||
return c.Flush() |
||||
} |
||||
return 0, nil |
||||
} |
||||
|
||||
// Flush implements Transport.
|
||||
func (c *HTTPTransport) Flush() (int, error) { |
||||
count := len(c.spans) |
||||
if count == 0 { |
||||
return 0, nil |
||||
} |
||||
err := c.send(c.spans) |
||||
c.spans = c.spans[:0] |
||||
return count, err |
||||
} |
||||
|
||||
// Close implements Transport.
|
||||
func (c *HTTPTransport) Close() error { |
||||
return nil |
||||
} |
||||
|
||||
func (c *HTTPTransport) send(spans []*j.Span) error { |
||||
batch := &j.Batch{ |
||||
Spans: spans, |
||||
Process: c.process, |
||||
} |
||||
body, err := serializeThrift(batch) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
req, err := http.NewRequest("POST", c.url, body) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
req.Header.Set("Content-Type", "application/x-thrift") |
||||
for k, v := range c.headers { |
||||
req.Header.Set(k, v) |
||||
} |
||||
|
||||
if c.httpCredentials != nil { |
||||
req.SetBasicAuth(c.httpCredentials.username, c.httpCredentials.password) |
||||
} |
||||
|
||||
resp, err := c.client.Do(req) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
io.Copy(ioutil.Discard, resp.Body) |
||||
resp.Body.Close() |
||||
if resp.StatusCode >= http.StatusBadRequest { |
||||
return fmt.Errorf("error from collector: %d", resp.StatusCode) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func serializeThrift(obj thrift.TStruct) (*bytes.Buffer, error) { |
||||
t := thrift.NewTMemoryBuffer() |
||||
p := thrift.NewTBinaryProtocolTransport(t) |
||||
if err := obj.Write(p); err != nil { |
||||
return nil, err |
||||
} |
||||
return t.Buffer, nil |
||||
} |
||||
|
||||
func BuildJaegerThrift(span *Span) *j.Span { |
||||
span.Lock() |
||||
defer span.Unlock() |
||||
startTime := span.startTime.UnixNano() / 1000 |
||||
duration := span.duration.Nanoseconds() / int64(time.Microsecond) |
||||
jaegerSpan := &j.Span{ |
||||
TraceIdLow: int64(span.context.traceID.Low), |
||||
TraceIdHigh: int64(span.context.traceID.High), |
||||
SpanId: int64(span.context.spanID), |
||||
ParentSpanId: int64(span.context.parentID), |
||||
OperationName: span.operationName, |
||||
Flags: int32(span.context.samplingState.flags()), |
||||
StartTime: startTime, |
||||
Duration: duration, |
||||
Tags: buildTags(span.tags, 100), |
||||
Logs: buildLogs(span.logs), |
||||
References: buildReferences(span.references), |
||||
} |
||||
return jaegerSpan |
||||
} |
||||
|
||||
func stringify(value interface{}) string { |
||||
if s, ok := value.(string); ok { |
||||
return s |
||||
} |
||||
return fmt.Sprintf("%+v", value) |
||||
} |
||||
|
||||
func truncateString(value string, maxLength int) string { |
||||
// we ignore the problem of utf8 runes possibly being sliced in the middle,
|
||||
// as it is rather expensive to iterate through each tag just to find rune
|
||||
// boundaries.
|
||||
if len(value) > maxLength { |
||||
return value[:maxLength] |
||||
} |
||||
return value |
||||
} |
||||
|
||||
func buildTags(tags []Tag, maxTagValueLength int) []*j.Tag { |
||||
jTags := make([]*j.Tag, 0, len(tags)) |
||||
for _, tag := range tags { |
||||
jTag := buildTag(&tag, maxTagValueLength) |
||||
jTags = append(jTags, jTag) |
||||
} |
||||
return jTags |
||||
} |
||||
func buildTag(tag *Tag, maxTagValueLength int) *j.Tag { |
||||
jTag := &j.Tag{Key: tag.key} |
||||
switch value := tag.value.(type) { |
||||
case string: |
||||
vStr := truncateString(value, maxTagValueLength) |
||||
jTag.VStr = &vStr |
||||
jTag.VType = j.TagType_STRING |
||||
case []byte: |
||||
if len(value) > maxTagValueLength { |
||||
value = value[:maxTagValueLength] |
||||
} |
||||
jTag.VBinary = value |
||||
jTag.VType = j.TagType_BINARY |
||||
case int: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case uint: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case int8: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case uint8: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case int16: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case uint16: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case int32: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case uint32: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case int64: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case uint64: |
||||
vLong := int64(value) |
||||
jTag.VLong = &vLong |
||||
jTag.VType = j.TagType_LONG |
||||
case float32: |
||||
vDouble := float64(value) |
||||
jTag.VDouble = &vDouble |
||||
jTag.VType = j.TagType_DOUBLE |
||||
case float64: |
||||
vDouble := float64(value) |
||||
jTag.VDouble = &vDouble |
||||
jTag.VType = j.TagType_DOUBLE |
||||
case bool: |
||||
vBool := value |
||||
jTag.VBool = &vBool |
||||
jTag.VType = j.TagType_BOOL |
||||
default: |
||||
vStr := truncateString(stringify(value), maxTagValueLength) |
||||
jTag.VStr = &vStr |
||||
jTag.VType = j.TagType_STRING |
||||
} |
||||
return jTag |
||||
} |
||||
|
||||
func buildLogs(logs []opentracing.LogRecord) []*j.Log { |
||||
jLogs := make([]*j.Log, 0, len(logs)) |
||||
for _, log := range logs { |
||||
jLog := &j.Log{ |
||||
Timestamp: log.Timestamp.UnixNano() / 1000, |
||||
Fields: ja.ConvertLogsToJaegerTags(log.Fields), |
||||
} |
||||
jLogs = append(jLogs, jLog) |
||||
} |
||||
return jLogs |
||||
} |
||||
|
||||
func buildReferences(references []Reference) []*j.SpanRef { |
||||
retMe := make([]*j.SpanRef, 0, len(references)) |
||||
for _, ref := range references { |
||||
if ref.Type == opentracing.ChildOfRef { |
||||
retMe = append(retMe, spanRef(ref.Context, j.SpanRefType_CHILD_OF)) |
||||
} else if ref.Type == opentracing.FollowsFromRef { |
||||
retMe = append(retMe, spanRef(ref.Context, j.SpanRefType_FOLLOWS_FROM)) |
||||
} |
||||
} |
||||
return retMe |
||||
} |
||||
|
||||
func spanRef(ctx SpanContext, refType j.SpanRefType) *j.SpanRef { |
||||
return &j.SpanRef{ |
||||
RefType: refType, |
||||
TraceIdLow: int64(ctx.traceID.Low), |
||||
TraceIdHigh: int64(ctx.traceID.High), |
||||
SpanId: int64(ctx.spanID), |
||||
} |
||||
} |
@ -0,0 +1,49 @@ |
||||
package jaeger |
||||
|
||||
import ( |
||||
"github.com/go-kratos/kratos/pkg/log" |
||||
"github.com/go-kratos/kratos/pkg/net/trace" |
||||
) |
||||
|
||||
type Config struct { |
||||
Endpoint string |
||||
BatchSize int |
||||
} |
||||
|
||||
type JaegerReporter struct { |
||||
transport *HTTPTransport |
||||
} |
||||
|
||||
func newReport(c *Config) *JaegerReporter { |
||||
transport := NewHTTPTransport(c.Endpoint) |
||||
transport.batchSize = c.BatchSize |
||||
return &JaegerReporter{transport: transport} |
||||
} |
||||
|
||||
func (r *JaegerReporter) WriteSpan(raw *trace.Span) (err error) { |
||||
ctx := raw.Context() |
||||
traceID := TraceID{Low: ctx.TraceID} |
||||
spanID := SpanID(ctx.SpanID) |
||||
parentID := SpanID(ctx.ParentID) |
||||
tags := raw.Tags() |
||||
log.Info("[info] write span") |
||||
span := &Span{ |
||||
context: NewSpanContext(traceID, spanID, parentID, true, nil), |
||||
operationName: raw.OperationName(), |
||||
startTime: raw.StartTime(), |
||||
duration: raw.Duration(), |
||||
} |
||||
|
||||
span.serviceName = raw.ServiceName() |
||||
|
||||
for _, t := range tags { |
||||
span.SetTag(t.Key, t.Value) |
||||
} |
||||
|
||||
r.transport.Append(span) |
||||
return nil |
||||
} |
||||
|
||||
func (rpt *JaegerReporter) Close() error { |
||||
return rpt.transport.Close() |
||||
} |
@ -0,0 +1,53 @@ |
||||
package jaeger |
||||
|
||||
import ( |
||||
"io/ioutil" |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"testing" |
||||
|
||||
"github.com/go-kratos/kratos/pkg/net/trace" |
||||
) |
||||
|
||||
func TestJaegerReporter(t *testing.T) { |
||||
var handler = func(w http.ResponseWriter, r *http.Request) { |
||||
if r.Method != "POST" { |
||||
t.Errorf("expected 'POST' request, got '%s'", r.Method) |
||||
} |
||||
|
||||
aSpanPayload, err := ioutil.ReadAll(r.Body) |
||||
if err != nil { |
||||
t.Errorf("unexpected error: %s", err.Error()) |
||||
} |
||||
|
||||
t.Logf("%s\n", aSpanPayload) |
||||
} |
||||
ht := httptest.NewServer(http.HandlerFunc(handler)) |
||||
defer ht.Close() |
||||
|
||||
c := &Config{ |
||||
Endpoint: ht.URL, |
||||
BatchSize: 1, |
||||
} |
||||
|
||||
//c.Endpoint = "http://127.0.0.1:14268/api/traces"
|
||||
|
||||
report := newReport(c) |
||||
t1 := trace.NewTracer("jaeger_test_1", report, true) |
||||
t2 := trace.NewTracer("jaeger_test_2", report, true) |
||||
sp1 := t1.New("option_1") |
||||
sp2 := sp1.Fork("service3", "opt_client") |
||||
sp2.SetLog(trace.Log("log_k", "log_v")) |
||||
// inject
|
||||
header := make(http.Header) |
||||
t1.Inject(sp2, trace.HTTPFormat, header) |
||||
t.Log(header) |
||||
sp3, err := t2.Extract(trace.HTTPFormat, header) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
sp3.Finish(nil) |
||||
sp2.Finish(nil) |
||||
sp1.Finish(nil) |
||||
report.Close() |
||||
} |
@ -0,0 +1,9 @@ |
||||
package jaeger |
||||
|
||||
import "github.com/opentracing/opentracing-go" |
||||
|
||||
// Reference represents a causal reference to other Spans (via their SpanContext).
|
||||
type Reference struct { |
||||
Type opentracing.SpanReferenceType |
||||
Context SpanContext |
||||
} |
@ -0,0 +1,345 @@ |
||||
package jaeger |
||||
|
||||
import ( |
||||
"sync" |
||||
"sync/atomic" |
||||
"time" |
||||
|
||||
"github.com/opentracing/opentracing-go" |
||||
"github.com/opentracing/opentracing-go/log" |
||||
) |
||||
|
||||
// Span implements opentracing.Span
|
||||
type Span struct { |
||||
// referenceCounter used to increase the lifetime of
|
||||
// the object before return it into the pool.
|
||||
referenceCounter int32 |
||||
|
||||
serviceName string |
||||
|
||||
sync.RWMutex |
||||
|
||||
// TODO: (breaking change) change to use a pointer
|
||||
context SpanContext |
||||
|
||||
// The name of the "operation" this span is an instance of.
|
||||
// Known as a "span name" in some implementations.
|
||||
operationName string |
||||
|
||||
// firstInProcess, if true, indicates that this span is the root of the (sub)tree
|
||||
// of spans in the current process. In other words it's true for the root spans,
|
||||
// and the ingress spans when the process joins another trace.
|
||||
firstInProcess bool |
||||
|
||||
// startTime is the timestamp indicating when the span began, with microseconds precision.
|
||||
startTime time.Time |
||||
|
||||
// duration returns duration of the span with microseconds precision.
|
||||
// Zero value means duration is unknown.
|
||||
duration time.Duration |
||||
|
||||
// tags attached to this span
|
||||
tags []Tag |
||||
|
||||
// The span's "micro-log"
|
||||
logs []opentracing.LogRecord |
||||
|
||||
// The number of logs dropped because of MaxLogsPerSpan.
|
||||
numDroppedLogs int |
||||
|
||||
// references for this span
|
||||
references []Reference |
||||
} |
||||
|
||||
// Tag is a simple key value wrapper.
|
||||
// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
|
||||
type Tag struct { |
||||
key string |
||||
value interface{} |
||||
} |
||||
|
||||
// NewTag creates a new Tag.
|
||||
// TODO (breaking change) deprecate in the next major release, use opentracing.Tag instead.
|
||||
func NewTag(key string, value interface{}) Tag { |
||||
return Tag{key: key, value: value} |
||||
} |
||||
|
||||
// SetOperationName sets or changes the operation name.
|
||||
func (s *Span) SetOperationName(operationName string) opentracing.Span { |
||||
s.Lock() |
||||
s.operationName = operationName |
||||
s.Unlock() |
||||
return s |
||||
} |
||||
|
||||
// SetTag implements SetTag() of opentracing.Span
|
||||
func (s *Span) SetTag(key string, value interface{}) opentracing.Span { |
||||
return s.setTagInternal(key, value, true) |
||||
} |
||||
|
||||
func (s *Span) setTagInternal(key string, value interface{}, lock bool) opentracing.Span { |
||||
if lock { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
} |
||||
s.appendTagNoLocking(key, value) |
||||
return s |
||||
} |
||||
|
||||
// SpanContext returns span context
|
||||
func (s *Span) SpanContext() SpanContext { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
return s.context |
||||
} |
||||
|
||||
// StartTime returns span start time
|
||||
func (s *Span) StartTime() time.Time { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
return s.startTime |
||||
} |
||||
|
||||
// Duration returns span duration
|
||||
func (s *Span) Duration() time.Duration { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
return s.duration |
||||
} |
||||
|
||||
// Tags returns tags for span
|
||||
func (s *Span) Tags() opentracing.Tags { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
var result = make(opentracing.Tags, len(s.tags)) |
||||
for _, tag := range s.tags { |
||||
result[tag.key] = tag.value |
||||
} |
||||
return result |
||||
} |
||||
|
||||
// Logs returns micro logs for span
|
||||
func (s *Span) Logs() []opentracing.LogRecord { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
|
||||
logs := append([]opentracing.LogRecord(nil), s.logs...) |
||||
if s.numDroppedLogs != 0 { |
||||
fixLogs(logs, s.numDroppedLogs) |
||||
} |
||||
|
||||
return logs |
||||
} |
||||
|
||||
// References returns references for this span
|
||||
func (s *Span) References() []opentracing.SpanReference { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
|
||||
if s.references == nil || len(s.references) == 0 { |
||||
return nil |
||||
} |
||||
|
||||
result := make([]opentracing.SpanReference, len(s.references)) |
||||
for i, r := range s.references { |
||||
result[i] = opentracing.SpanReference{Type: r.Type, ReferencedContext: r.Context} |
||||
} |
||||
return result |
||||
} |
||||
|
||||
func (s *Span) appendTagNoLocking(key string, value interface{}) { |
||||
s.tags = append(s.tags, Tag{key: key, value: value}) |
||||
} |
||||
|
||||
// LogFields implements opentracing.Span API
|
||||
func (s *Span) LogFields(fields ...log.Field) { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
if !s.context.IsSampled() { |
||||
return |
||||
} |
||||
s.logFieldsNoLocking(fields...) |
||||
} |
||||
|
||||
// this function should only be called while holding a Write lock
|
||||
func (s *Span) logFieldsNoLocking(fields ...log.Field) { |
||||
lr := opentracing.LogRecord{ |
||||
Fields: fields, |
||||
Timestamp: time.Now(), |
||||
} |
||||
s.appendLogNoLocking(lr) |
||||
} |
||||
|
||||
// LogKV implements opentracing.Span API
|
||||
func (s *Span) LogKV(alternatingKeyValues ...interface{}) { |
||||
s.RLock() |
||||
sampled := s.context.IsSampled() |
||||
s.RUnlock() |
||||
if !sampled { |
||||
return |
||||
} |
||||
fields, err := log.InterleavedKVToFields(alternatingKeyValues...) |
||||
if err != nil { |
||||
s.LogFields(log.Error(err), log.String("function", "LogKV")) |
||||
return |
||||
} |
||||
s.LogFields(fields...) |
||||
} |
||||
|
||||
// LogEvent implements opentracing.Span API
|
||||
func (s *Span) LogEvent(event string) { |
||||
s.Log(opentracing.LogData{Event: event}) |
||||
} |
||||
|
||||
// LogEventWithPayload implements opentracing.Span API
|
||||
func (s *Span) LogEventWithPayload(event string, payload interface{}) { |
||||
s.Log(opentracing.LogData{Event: event, Payload: payload}) |
||||
} |
||||
|
||||
// Log implements opentracing.Span API
|
||||
func (s *Span) Log(ld opentracing.LogData) { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
if s.context.IsSampled() { |
||||
s.appendLogNoLocking(ld.ToLogRecord()) |
||||
} |
||||
} |
||||
|
||||
// this function should only be called while holding a Write lock
|
||||
func (s *Span) appendLogNoLocking(lr opentracing.LogRecord) { |
||||
maxLogs := 100 |
||||
// We have too many logs. We don't touch the first numOld logs; we treat the
|
||||
// rest as a circular buffer and overwrite the oldest log among those.
|
||||
numOld := (maxLogs - 1) / 2 |
||||
numNew := maxLogs - numOld |
||||
s.logs[numOld+s.numDroppedLogs%numNew] = lr |
||||
s.numDroppedLogs++ |
||||
} |
||||
|
||||
// rotateLogBuffer rotates the records in the buffer: records 0 to pos-1 move at
|
||||
// the end (i.e. pos circular left shifts).
|
||||
func rotateLogBuffer(buf []opentracing.LogRecord, pos int) { |
||||
// This algorithm is described in:
|
||||
// http://www.cplusplus.com/reference/algorithm/rotate
|
||||
for first, middle, next := 0, pos, pos; first != middle; { |
||||
buf[first], buf[next] = buf[next], buf[first] |
||||
first++ |
||||
next++ |
||||
if next == len(buf) { |
||||
next = middle |
||||
} else if first == middle { |
||||
middle = next |
||||
} |
||||
} |
||||
} |
||||
|
||||
func fixLogs(logs []opentracing.LogRecord, numDroppedLogs int) { |
||||
// We dropped some log events, which means that we used part of Logs as a
|
||||
// circular buffer (see appendLog). De-circularize it.
|
||||
numOld := (len(logs) - 1) / 2 |
||||
numNew := len(logs) - numOld |
||||
rotateLogBuffer(logs[numOld:], numDroppedLogs%numNew) |
||||
|
||||
// Replace the log in the middle (the oldest "new" log) with information
|
||||
// about the dropped logs. This means that we are effectively dropping one
|
||||
// more "new" log.
|
||||
numDropped := numDroppedLogs + 1 |
||||
logs[numOld] = opentracing.LogRecord{ |
||||
// Keep the timestamp of the last dropped event.
|
||||
Timestamp: logs[numOld].Timestamp, |
||||
Fields: []log.Field{ |
||||
log.String("event", "dropped Span logs"), |
||||
log.Int("dropped_log_count", numDropped), |
||||
log.String("component", "jaeger-client"), |
||||
}, |
||||
} |
||||
} |
||||
|
||||
func (s *Span) fixLogsIfDropped() { |
||||
if s.numDroppedLogs == 0 { |
||||
return |
||||
} |
||||
fixLogs(s.logs, s.numDroppedLogs) |
||||
s.numDroppedLogs = 0 |
||||
} |
||||
|
||||
// SetBaggageItem implements SetBaggageItem() of opentracing.SpanContext
|
||||
func (s *Span) SetBaggageItem(key, value string) opentracing.Span { |
||||
s.context.baggage[key] = value |
||||
return s |
||||
} |
||||
|
||||
// BaggageItem implements BaggageItem() of opentracing.SpanContext
|
||||
func (s *Span) BaggageItem(key string) string { |
||||
s.RLock() |
||||
defer s.RUnlock() |
||||
return s.context.baggage[key] |
||||
} |
||||
|
||||
// Finish implements opentracing.Span API
|
||||
// After finishing the Span object it returns back to the allocator unless the reporter retains it again,
|
||||
// so after that, the Span object should no longer be used because it won't be valid anymore.
|
||||
func (s *Span) Finish() { |
||||
s.FinishWithOptions(opentracing.FinishOptions{}) |
||||
} |
||||
|
||||
// FinishWithOptions implements opentracing.Span API
|
||||
func (s *Span) FinishWithOptions(options opentracing.FinishOptions) { |
||||
} |
||||
|
||||
// Context implements opentracing.Span API
|
||||
func (s *Span) Context() opentracing.SpanContext { |
||||
s.Lock() |
||||
defer s.Unlock() |
||||
return s.context |
||||
} |
||||
|
||||
// Tracer implements opentracing.Span API
|
||||
func (s *Span) Tracer() opentracing.Tracer { |
||||
return nil |
||||
} |
||||
|
||||
func (s *Span) String() string { |
||||
s.RLock() |
||||
defer s.RUnlock() |
||||
return s.context.String() |
||||
} |
||||
|
||||
// OperationName allows retrieving current operation name.
|
||||
func (s *Span) OperationName() string { |
||||
s.RLock() |
||||
defer s.RUnlock() |
||||
return s.operationName |
||||
} |
||||
|
||||
// Retain increases object counter to increase the lifetime of the object
|
||||
func (s *Span) Retain() *Span { |
||||
atomic.AddInt32(&s.referenceCounter, 1) |
||||
return s |
||||
} |
||||
|
||||
// Release decrements object counter and return to the
|
||||
// allocator manager when counter will below zero
|
||||
func (s *Span) Release() { |
||||
|
||||
} |
||||
|
||||
// reset span state and release unused data
|
||||
func (s *Span) reset() { |
||||
s.firstInProcess = false |
||||
s.context = emptyContext |
||||
s.operationName = "" |
||||
s.startTime = time.Time{} |
||||
s.duration = 0 |
||||
atomic.StoreInt32(&s.referenceCounter, 0) |
||||
|
||||
// Note: To reuse memory we can save the pointers on the heap
|
||||
s.tags = s.tags[:0] |
||||
s.logs = s.logs[:0] |
||||
s.numDroppedLogs = 0 |
||||
s.references = s.references[:0] |
||||
} |
||||
|
||||
func (s *Span) ServiceName() string { |
||||
return s.serviceName |
||||
} |
@ -0,0 +1,369 @@ |
||||
package jaeger |
||||
|
||||
import ( |
||||
"errors" |
||||
"fmt" |
||||
"strconv" |
||||
"strings" |
||||
"sync" |
||||
|
||||
"go.uber.org/atomic" |
||||
) |
||||
|
||||
const ( |
||||
flagSampled = 1 |
||||
flagDebug = 2 |
||||
flagFirehose = 8 |
||||
) |
||||
|
||||
var ( |
||||
errEmptyTracerStateString = errors.New("Cannot convert empty string to tracer state") |
||||
errMalformedTracerStateString = errors.New("String does not match tracer state format") |
||||
|
||||
emptyContext = SpanContext{} |
||||
) |
||||
|
||||
// TraceID represents unique 128bit identifier of a trace
|
||||
type TraceID struct { |
||||
High, Low uint64 |
||||
} |
||||
|
||||
// SpanID represents unique 64bit identifier of a span
|
||||
type SpanID uint64 |
||||
|
||||
// SpanContext represents propagated span identity and state
|
||||
type SpanContext struct { |
||||
// traceID represents globally unique ID of the trace.
|
||||
// Usually generated as a random number.
|
||||
traceID TraceID |
||||
|
||||
// spanID represents span ID that must be unique within its trace,
|
||||
// but does not have to be globally unique.
|
||||
spanID SpanID |
||||
|
||||
// parentID refers to the ID of the parent span.
|
||||
// Should be 0 if the current span is a root span.
|
||||
parentID SpanID |
||||
|
||||
// Distributed Context baggage. The is a snapshot in time.
|
||||
baggage map[string]string |
||||
|
||||
// debugID can be set to some correlation ID when the context is being
|
||||
// extracted from a TextMap carrier.
|
||||
//
|
||||
// See JaegerDebugHeader in constants.go
|
||||
debugID string |
||||
|
||||
// samplingState is shared across all spans
|
||||
samplingState *samplingState |
||||
|
||||
// remote indicates that span context represents a remote parent
|
||||
remote bool |
||||
} |
||||
|
||||
type samplingState struct { |
||||
// Span context's state flags that are propagated across processes. Only lower 8 bits are used.
|
||||
// We use an int32 instead of byte to be able to use CAS operations.
|
||||
stateFlags atomic.Int32 |
||||
|
||||
// When state is not final, sampling will be retried on other span write operations,
|
||||
// like SetOperationName / SetTag, and the spans will remain writable.
|
||||
final atomic.Bool |
||||
|
||||
// localRootSpan stores the SpanID of the first span created in this process for a given trace.
|
||||
localRootSpan SpanID |
||||
|
||||
// extendedState allows samplers to keep intermediate state.
|
||||
// The keys and values in this map are completely opaque: interface{} -> interface{}.
|
||||
extendedState sync.Map |
||||
} |
||||
|
||||
func (s *samplingState) isLocalRootSpan(id SpanID) bool { |
||||
return id == s.localRootSpan |
||||
} |
||||
|
||||
func (s *samplingState) setFlag(newFlag int32) { |
||||
swapped := false |
||||
for !swapped { |
||||
old := s.stateFlags.Load() |
||||
swapped = s.stateFlags.CAS(old, old|newFlag) |
||||
} |
||||
} |
||||
|
||||
func (s *samplingState) unsetFlag(newFlag int32) { |
||||
swapped := false |
||||
for !swapped { |
||||
old := s.stateFlags.Load() |
||||
swapped = s.stateFlags.CAS(old, old&^newFlag) |
||||
} |
||||
} |
||||
|
||||
func (s *samplingState) setSampled() { |
||||
s.setFlag(flagSampled) |
||||
} |
||||
|
||||
func (s *samplingState) unsetSampled() { |
||||
s.unsetFlag(flagSampled) |
||||
} |
||||
|
||||
func (s *samplingState) setDebugAndSampled() { |
||||
s.setFlag(flagDebug | flagSampled) |
||||
} |
||||
|
||||
func (s *samplingState) setFirehose() { |
||||
s.setFlag(flagFirehose) |
||||
} |
||||
|
||||
func (s *samplingState) setFlags(flags byte) { |
||||
s.stateFlags.Store(int32(flags)) |
||||
} |
||||
|
||||
func (s *samplingState) setFinal() { |
||||
s.final.Store(true) |
||||
} |
||||
|
||||
func (s *samplingState) flags() byte { |
||||
return byte(s.stateFlags.Load()) |
||||
} |
||||
|
||||
func (s *samplingState) isSampled() bool { |
||||
return s.stateFlags.Load()&flagSampled == flagSampled |
||||
} |
||||
|
||||
func (s *samplingState) isDebug() bool { |
||||
return s.stateFlags.Load()&flagDebug == flagDebug |
||||
} |
||||
|
||||
func (s *samplingState) isFirehose() bool { |
||||
return s.stateFlags.Load()&flagFirehose == flagFirehose |
||||
} |
||||
|
||||
func (s *samplingState) isFinal() bool { |
||||
return s.final.Load() |
||||
} |
||||
|
||||
func (s *samplingState) extendedStateForKey(key interface{}, initValue func() interface{}) interface{} { |
||||
if value, ok := s.extendedState.Load(key); ok { |
||||
return value |
||||
} |
||||
value := initValue() |
||||
value, _ = s.extendedState.LoadOrStore(key, value) |
||||
return value |
||||
} |
||||
|
||||
// ForeachBaggageItem implements ForeachBaggageItem() of opentracing.SpanContext
|
||||
func (c SpanContext) ForeachBaggageItem(handler func(k, v string) bool) { |
||||
for k, v := range c.baggage { |
||||
if !handler(k, v) { |
||||
break |
||||
} |
||||
} |
||||
} |
||||
|
||||
// IsSampled returns whether this trace was chosen for permanent storage
|
||||
// by the sampling mechanism of the tracer.
|
||||
func (c SpanContext) IsSampled() bool { |
||||
return c.samplingState.isSampled() |
||||
} |
||||
|
||||
// IsDebug indicates whether sampling was explicitly requested by the service.
|
||||
func (c SpanContext) IsDebug() bool { |
||||
return c.samplingState.isDebug() |
||||
} |
||||
|
||||
// IsSamplingFinalized indicates whether the sampling decision has been finalized.
|
||||
func (c SpanContext) IsSamplingFinalized() bool { |
||||
return c.samplingState.isFinal() |
||||
} |
||||
|
||||
// IsFirehose indicates whether the firehose flag was set
|
||||
func (c SpanContext) IsFirehose() bool { |
||||
return c.samplingState.isFirehose() |
||||
} |
||||
|
||||
// ExtendedSamplingState returns the custom state object for a given key. If the value for this key does not exist,
|
||||
// it is initialized via initValue function. This state can be used by samplers (e.g. x.PrioritySampler).
|
||||
func (c SpanContext) ExtendedSamplingState(key interface{}, initValue func() interface{}) interface{} { |
||||
return c.samplingState.extendedStateForKey(key, initValue) |
||||
} |
||||
|
||||
// IsValid indicates whether this context actually represents a valid trace.
|
||||
func (c SpanContext) IsValid() bool { |
||||
return c.traceID.IsValid() && c.spanID != 0 |
||||
} |
||||
|
||||
// SetFirehose enables firehose mode for this trace.
|
||||
func (c SpanContext) SetFirehose() { |
||||
c.samplingState.setFirehose() |
||||
} |
||||
|
||||
func (c SpanContext) String() string { |
||||
if c.traceID.High == 0 { |
||||
return fmt.Sprintf("%016x:%016x:%016x:%x", c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) |
||||
} |
||||
return fmt.Sprintf("%016x%016x:%016x:%016x:%x", c.traceID.High, c.traceID.Low, uint64(c.spanID), uint64(c.parentID), c.samplingState.stateFlags.Load()) |
||||
} |
||||
|
||||
// ContextFromString reconstructs the Context encoded in a string
|
||||
func ContextFromString(value string) (SpanContext, error) { |
||||
var context SpanContext |
||||
if value == "" { |
||||
return emptyContext, errEmptyTracerStateString |
||||
} |
||||
parts := strings.Split(value, ":") |
||||
if len(parts) != 4 { |
||||
return emptyContext, errMalformedTracerStateString |
||||
} |
||||
var err error |
||||
if context.traceID, err = TraceIDFromString(parts[0]); err != nil { |
||||
return emptyContext, err |
||||
} |
||||
if context.spanID, err = SpanIDFromString(parts[1]); err != nil { |
||||
return emptyContext, err |
||||
} |
||||
if context.parentID, err = SpanIDFromString(parts[2]); err != nil { |
||||
return emptyContext, err |
||||
} |
||||
flags, err := strconv.ParseUint(parts[3], 10, 8) |
||||
if err != nil { |
||||
return emptyContext, err |
||||
} |
||||
context.samplingState = &samplingState{} |
||||
context.samplingState.setFlags(byte(flags)) |
||||
return context, nil |
||||
} |
||||
|
||||
// TraceID returns the trace ID of this span context
|
||||
func (c SpanContext) TraceID() TraceID { |
||||
return c.traceID |
||||
} |
||||
|
||||
// SpanID returns the span ID of this span context
|
||||
func (c SpanContext) SpanID() SpanID { |
||||
return c.spanID |
||||
} |
||||
|
||||
// ParentID returns the parent span ID of this span context
|
||||
func (c SpanContext) ParentID() SpanID { |
||||
return c.parentID |
||||
} |
||||
|
||||
// Flags returns the bitmap containing such bits as 'sampled' and 'debug'.
|
||||
func (c SpanContext) Flags() byte { |
||||
return c.samplingState.flags() |
||||
} |
||||
|
||||
// NewSpanContext creates a new instance of SpanContext
|
||||
func NewSpanContext(traceID TraceID, spanID, parentID SpanID, sampled bool, baggage map[string]string) SpanContext { |
||||
samplingState := &samplingState{} |
||||
if sampled { |
||||
samplingState.setSampled() |
||||
} |
||||
|
||||
return SpanContext{ |
||||
traceID: traceID, |
||||
spanID: spanID, |
||||
parentID: parentID, |
||||
samplingState: samplingState, |
||||
baggage: baggage} |
||||
} |
||||
|
||||
// CopyFrom copies data from ctx into this context, including span identity and baggage.
|
||||
// TODO This is only used by interop.go. Remove once TChannel Go supports OpenTracing.
|
||||
func (c *SpanContext) CopyFrom(ctx *SpanContext) { |
||||
c.traceID = ctx.traceID |
||||
c.spanID = ctx.spanID |
||||
c.parentID = ctx.parentID |
||||
c.samplingState = ctx.samplingState |
||||
if l := len(ctx.baggage); l > 0 { |
||||
c.baggage = make(map[string]string, l) |
||||
for k, v := range ctx.baggage { |
||||
c.baggage[k] = v |
||||
} |
||||
} else { |
||||
c.baggage = nil |
||||
} |
||||
} |
||||
|
||||
// WithBaggageItem creates a new context with an extra baggage item.
|
||||
func (c SpanContext) WithBaggageItem(key, value string) SpanContext { |
||||
var newBaggage map[string]string |
||||
if c.baggage == nil { |
||||
newBaggage = map[string]string{key: value} |
||||
} else { |
||||
newBaggage = make(map[string]string, len(c.baggage)+1) |
||||
for k, v := range c.baggage { |
||||
newBaggage[k] = v |
||||
} |
||||
newBaggage[key] = value |
||||
} |
||||
// Use positional parameters so the compiler will help catch new fields.
|
||||
return SpanContext{c.traceID, c.spanID, c.parentID, newBaggage, "", c.samplingState, c.remote} |
||||
} |
||||
|
||||
// isDebugIDContainerOnly returns true when the instance of the context is only
|
||||
// used to return the debug/correlation ID from extract() method. This happens
|
||||
// in the situation when "jaeger-debug-id" header is passed in the carrier to
|
||||
// the extract() method, but the request otherwise has no span context in it.
|
||||
// Previously this would've returned opentracing.ErrSpanContextNotFound from the
|
||||
// extract method, but now it returns a dummy context with only debugID filled in.
|
||||
//
|
||||
// See JaegerDebugHeader in constants.go
|
||||
// See TextMapPropagator#Extract
|
||||
func (c *SpanContext) isDebugIDContainerOnly() bool { |
||||
return !c.traceID.IsValid() && c.debugID != "" |
||||
} |
||||
|
||||
// ------- TraceID -------
|
||||
|
||||
func (t TraceID) String() string { |
||||
if t.High == 0 { |
||||
return fmt.Sprintf("%x", t.Low) |
||||
} |
||||
return fmt.Sprintf("%x%016x", t.High, t.Low) |
||||
} |
||||
|
||||
// TraceIDFromString creates a TraceID from a hexadecimal string
|
||||
func TraceIDFromString(s string) (TraceID, error) { |
||||
var hi, lo uint64 |
||||
var err error |
||||
if len(s) > 32 { |
||||
return TraceID{}, fmt.Errorf("TraceID cannot be longer than 32 hex characters: %s", s) |
||||
} else if len(s) > 16 { |
||||
hiLen := len(s) - 16 |
||||
if hi, err = strconv.ParseUint(s[0:hiLen], 16, 64); err != nil { |
||||
return TraceID{}, err |
||||
} |
||||
if lo, err = strconv.ParseUint(s[hiLen:], 16, 64); err != nil { |
||||
return TraceID{}, err |
||||
} |
||||
} else { |
||||
if lo, err = strconv.ParseUint(s, 16, 64); err != nil { |
||||
return TraceID{}, err |
||||
} |
||||
} |
||||
return TraceID{High: hi, Low: lo}, nil |
||||
} |
||||
|
||||
// IsValid checks if the trace ID is valid, i.e. not zero.
|
||||
func (t TraceID) IsValid() bool { |
||||
return t.High != 0 || t.Low != 0 |
||||
} |
||||
|
||||
// ------- SpanID -------
|
||||
|
||||
func (s SpanID) String() string { |
||||
return fmt.Sprintf("%x", uint64(s)) |
||||
} |
||||
|
||||
// SpanIDFromString creates a SpanID from a hexadecimal string
|
||||
func SpanIDFromString(s string) (SpanID, error) { |
||||
if len(s) > 16 { |
||||
return SpanID(0), fmt.Errorf("SpanID cannot be longer than 16 hex characters: %s", s) |
||||
} |
||||
id, err := strconv.ParseUint(s, 16, 64) |
||||
if err != nil { |
||||
return SpanID(0), err |
||||
} |
||||
return SpanID(id), nil |
||||
} |
Loading…
Reference in new issue