commit
4d47b09111
@ -0,0 +1,30 @@ |
||||
package zipkin |
||||
|
||||
import ( |
||||
"time" |
||||
|
||||
"github.com/bilibili/kratos/pkg/conf/env" |
||||
"github.com/bilibili/kratos/pkg/net/trace" |
||||
xtime "github.com/bilibili/kratos/pkg/time" |
||||
) |
||||
|
||||
// Config config.
|
||||
// url should be the endpoint to send the spans to, e.g.
|
||||
// http://localhost:9411/api/v2/spans
|
||||
type Config struct { |
||||
Endpoint string `dsn:"endpoint"` |
||||
BatchSize int `dsn:"query.batch_size,100"` |
||||
Timeout xtime.Duration `dsn:"query.timeout,200ms"` |
||||
DisableSample bool `dsn:"query.disable_sample"` |
||||
} |
||||
|
||||
// Init init trace report.
|
||||
func Init(c *Config) { |
||||
if c.BatchSize == 0 { |
||||
c.BatchSize = 100 |
||||
} |
||||
if c.Timeout == 0 { |
||||
c.Timeout = xtime.Duration(200 * time.Millisecond) |
||||
} |
||||
trace.SetGlobalTracer(trace.NewTracer(env.AppID, newReport(c), c.DisableSample)) |
||||
} |
@ -0,0 +1,79 @@ |
||||
package zipkin |
||||
|
||||
import ( |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/bilibili/kratos/pkg/net/trace" |
||||
"github.com/openzipkin/zipkin-go/model" |
||||
"github.com/openzipkin/zipkin-go/reporter" |
||||
"github.com/openzipkin/zipkin-go/reporter/http" |
||||
) |
||||
|
||||
type report struct { |
||||
rpt reporter.Reporter |
||||
} |
||||
|
||||
func newReport(c *Config) *report { |
||||
return &report{ |
||||
rpt: http.NewReporter(c.Endpoint, |
||||
http.Timeout(time.Duration(c.Timeout)), |
||||
http.BatchSize(c.BatchSize), |
||||
), |
||||
} |
||||
} |
||||
|
||||
// WriteSpan write a trace span to queue.
|
||||
func (r *report) WriteSpan(raw *trace.Span) (err error) { |
||||
ctx := raw.Context() |
||||
traceID := model.TraceID{Low: ctx.TraceID} |
||||
spanID := model.ID(ctx.SpanID) |
||||
parentID := model.ID(ctx.ParentID) |
||||
tags := raw.Tags() |
||||
logs := raw.Logs() |
||||
span := model.SpanModel{ |
||||
SpanContext: model.SpanContext{ |
||||
TraceID: traceID, |
||||
ID: spanID, |
||||
ParentID: &parentID, |
||||
}, |
||||
Name: raw.Name(), |
||||
Timestamp: raw.StartTime(), |
||||
Duration: raw.Duration(), |
||||
Tags: make(map[string]string, len(tags)+len(logs)), |
||||
} |
||||
for _, tag := range tags { |
||||
switch tag.Key { |
||||
case trace.TagSpanKind: |
||||
switch tag.Value.(string) { |
||||
case "client": |
||||
span.Kind = model.Client |
||||
case "server": |
||||
span.Kind = model.Server |
||||
case "producer": |
||||
span.Kind = model.Producer |
||||
case "consumer": |
||||
span.Kind = model.Consumer |
||||
} |
||||
case trace.TagPeerService: |
||||
span.LocalEndpoint = &model.Endpoint{ServiceName: tag.Value.(string)} |
||||
default: |
||||
v, ok := tag.Value.(string) |
||||
if ok { |
||||
span.Tags[tag.Key] = v |
||||
} else { |
||||
span.Tags[tag.Key] = fmt.Sprint(v) |
||||
} |
||||
} |
||||
} |
||||
for _, lg := range logs { |
||||
span.Tags[lg.Key] = string(lg.Value) |
||||
} |
||||
r.rpt.Send(span) |
||||
return |
||||
} |
||||
|
||||
// Close close the report.
|
||||
func (r *report) Close() error { |
||||
return r.rpt.Close() |
||||
} |
@ -0,0 +1,52 @@ |
||||
package zipkin |
||||
|
||||
import ( |
||||
"io/ioutil" |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/bilibili/kratos/pkg/net/trace" |
||||
xtime "github.com/bilibili/kratos/pkg/time" |
||||
) |
||||
|
||||
func TestZipkin(t *testing.T) { |
||||
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { |
||||
if r.Method != "POST" { |
||||
t.Errorf("expected 'POST' request, got '%s'", r.Method) |
||||
} |
||||
|
||||
aSpanPayload, err := ioutil.ReadAll(r.Body) |
||||
if err != nil { |
||||
t.Errorf("unexpected error: %s", err.Error()) |
||||
} |
||||
|
||||
t.Logf("%s\n", aSpanPayload) |
||||
})) |
||||
defer ts.Close() |
||||
|
||||
c := &Config{ |
||||
Endpoint: ts.URL, |
||||
Timeout: xtime.Duration(time.Second * 5), |
||||
BatchSize: 100, |
||||
} |
||||
//c.Endpoint = "http://127.0.0.1:9411/api/v2/spans"
|
||||
report := newReport(c) |
||||
t1 := trace.NewTracer("service1", report, true) |
||||
t2 := trace.NewTracer("service2", report, true) |
||||
sp1 := t1.New("option_1") |
||||
sp2 := sp1.Fork("service3", "opt_client") |
||||
// inject
|
||||
header := make(http.Header) |
||||
t1.Inject(sp2, trace.HTTPFormat, header) |
||||
t.Log(header) |
||||
sp3, err := t2.Extract(trace.HTTPFormat, header) |
||||
if err != nil { |
||||
t.Fatal(err) |
||||
} |
||||
sp3.Finish(nil) |
||||
sp2.Finish(nil) |
||||
sp1.Finish(nil) |
||||
report.Close() |
||||
} |
Loading…
Reference in new issue