From f6629f235d6238d5d64e84b7bd843eb885b31dcd Mon Sep 17 00:00:00 2001 From: Tony Date: Mon, 15 Jul 2019 19:56:47 +0800 Subject: [PATCH] add zipkin trace --- go.mod | 1 + go.sum | 25 ++++++++++- pkg/net/trace/config.go | 4 +- pkg/net/trace/dapper.go | 42 +++++++++---------- pkg/net/trace/dapper_test.go | 22 +++++----- pkg/net/trace/marshal.go | 4 +- pkg/net/trace/marshal_test.go | 4 +- pkg/net/trace/report.go | 4 +- pkg/net/trace/span.go | 53 ++++++++++++++++++------ pkg/net/trace/span_test.go | 18 ++++---- pkg/net/trace/util_test.go | 2 +- pkg/net/trace/zipkin/config.go | 30 ++++++++++++++ pkg/net/trace/zipkin/zipkin.go | 64 +++++++++++++++++++++++++++++ pkg/net/trace/zipkin/zipkin_test.go | 33 +++++++++++++++ 14 files changed, 242 insertions(+), 64 deletions(-) create mode 100644 pkg/net/trace/zipkin/config.go create mode 100644 pkg/net/trace/zipkin/zipkin.go create mode 100644 pkg/net/trace/zipkin/zipkin_test.go diff --git a/go.mod b/go.mod index 714f2934f..d21eb3e15 100644 --- a/go.mod +++ b/go.mod @@ -23,6 +23,7 @@ require ( github.com/mattn/go-colorable v0.0.9 // indirect github.com/mattn/go-isatty v0.0.4 // indirect github.com/montanaflynn/stats v0.5.0 + github.com/openzipkin/zipkin-go v0.2.0 github.com/pkg/errors v0.8.1 github.com/prometheus/client_golang v0.9.2 github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 // indirect diff --git a/go.sum b/go.sum index b0075cdbd..72b08b006 100644 --- a/go.sum +++ b/go.sum @@ -1,5 +1,7 @@ github.com/BurntSushi/toml v0.3.1 h1:WXkYYl6Yr3qBf1K79EBnL4mak0OimBfB0XUf9Vl28OQ= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= +github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo= +github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef h1:ajsnF5qTstiBlP+V/mgh91zZfoKP477KfSmRoCoyYGU= github.com/aristanetworks/goarista v0.0.0-20190409234242-46f4bc7b73ef/go.mod h1:D/tb0zPVXnP7fmsLZjtdUhSsumbK/ij54UXjjVgMGxQ= github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973 h1:xJ4a3vCFaGF/jqvzLMYoU8P317H5OQ+Via4RmuPwCS0= @@ -18,6 +20,9 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/dgryski/go-farm v0.0.0-20190323231341-8198c7b169ec h1:sElGDs3V8VdCxH5tWi0ycWJzteOPLJ3HtItSSKI95PY= github.com/dgryski/go-farm v0.0.0-20190323231341-8198c7b169ec/go.mod h1:SqUrOPUnsFjfmXRMNPybcSiG0BgUW2AuFH8PAnS2iTw= +github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= +github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/fatih/color v1.7.0 h1:DkWD4oS2D8LGGgTQ6IvwJJXSL5Vp2ffcQg58nFV38Ys= github.com/fatih/color v1.7.0/go.mod h1:Zm6kSWBoL9eyXnKyktHP6abPY2pDugNf5KwzbycvMj4= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= @@ -47,6 +52,7 @@ github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= github.com/golang/protobuf v1.3.1 h1:YF8+flBXS5eO826T4nzqPrxfhQThhXl0YzfuUPu4SBg= github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/sync v0.0.0-20181108010431-42b317875d0f h1:vuwODIDRvDgwjIl6VTMf0c1Z9uVMUUxiu6UPUjiGhD4= github.com/golang/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:YCHYtYb9c8Q7XgYVYjmJBPtFPKx5QvOcPxHZWjldabE= github.com/golang/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:GJexUf2QgFNvMR9sjJ1iqs+2TxZqJko+Muhnu04tPuU= @@ -61,8 +67,11 @@ github.com/google/go-genproto v0.0.0-20180817151627-c66870c02cf8 h1:I9PuChzQA31g github.com/google/go-genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:3Rcd9jSoLVkV/osPrt5CogLvLiarfI8U9/x78NwhuDU= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/googleapis/google-cloud-go v0.26.0/go.mod h1:yJoOdPPE9UpqbamBhJvp7Ur6OUPPV4rUY3RnssPGNBA= +github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= +github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= github.com/grpc/grpc-go v1.20.1 h1:pk72GtSPpOdZDTkPneppDMGW10HYPC7RqNJT/JvUpV0= github.com/grpc/grpc-go v1.20.1/go.mod h1:10oTOabMzJvdu6/UiuZezV6QK5dSlG84ov/aaiqXj38= +github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= github.com/konsorten/go-windows-terminal-sequences v1.0.1 h1:mweAR1A6xJ3oS2pRaGiHgQ4OO8tzTaLawm8vnODuwDk= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= @@ -80,8 +89,15 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1 h1:4hp9jkHxhMHkqkrB3Ix0j github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/montanaflynn/stats v0.5.0 h1:2EkzeTSqBB4V4bJwWrt5gIIrZmpJBcoIRGS2kWLgzmk= github.com/montanaflynn/stats v0.5.0/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= +github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= +github.com/openzipkin/zipkin-go v0.2.0 h1:33/f6xXB6YlOQ9tgTsXVOkdLCJsHTcZJnMy4DnSd6FU= +github.com/openzipkin/zipkin-go v0.2.0/go.mod h1:NaW6tEwdmWMaCDZzg8sh+IBNOxHMPnhQw8ySjnjRyN4= +github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/profile v1.2.1/go.mod h1:hJw3o1OdXxsrSjjVksARp5W95eeEaEfptyVZyv6JUPA= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/prometheus/client_golang v0.9.2 h1:awm861/B8OKDd2I/6o1dy3ra4BamzKhYOiGItCeZ740= @@ -92,6 +108,7 @@ github.com/prometheus/common v0.0.0-20181126121408-4724e9255275 h1:PnBWHBf+6L0jO github.com/prometheus/common v0.0.0-20181126121408-4724e9255275/go.mod h1:daVV7qP5qjZbuso7PdcryaAu0sAZbrN9i7WWcTMWvro= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a h1:9a8MnZMP0X2nLJdBg+pBmGgkJlSaKC2KaQmTCk1XDtE= github.com/prometheus/procfs v0.0.0-20181204211112-1dc9a6cbc91a/go.mod h1:c3At6R/oaqEKCNdg8wHV1ftS6bRYblBhIjjI8uT2IGk= +github.com/rcrowley/go-metrics v0.0.0-20181016184325-3113b8401b8a/go.mod h1:bCqnVzQkZxMG4s8nGwiZ5l3QUCyqpo9Y+/ZMZ9VjZe4= github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001 h1:YDeskXpkNDhPdWN3REluVa46HQOVuVkjkd2sWnrABNQ= github.com/remyoudompheng/bigfft v0.0.0-20190321074620-2f0d2b0e0001/go.mod h1:qqbHyh8v60DhA7CoWK5oRCqLrMHRGoxYCSS9EjAz6Eo= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= @@ -103,6 +120,7 @@ github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726 h1:xT+JlYxNGqyT+XcU8 github.com/siddontang/go v0.0.0-20180604090527-bdc77568d726/go.mod h1:3yhqj7WBBfRhbBlzyOC3gUxftwsU0u8gqevxwIHQpMw= github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k= github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= +github.com/streadway/amqp v0.0.0-20190404075320-75d898a42a94/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= @@ -112,16 +130,19 @@ github.com/tsuna/gohbase v0.0.0-20190201102810-d3184c1526df h1:jYiwqXfoRWU6pJMzC github.com/tsuna/gohbase v0.0.0-20190201102810-d3184c1526df/go.mod h1:3HfLQly3YNLGxNv/2YOfmz30vcjG9hbuME1GpxoLlGs= github.com/urfave/cli v1.20.0 h1:fDqGv3UG/4jbVl/QkFwEdddtEDjh/5Ov6X+0B/3bPaw= github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKGUJ2LatrhH/nqhxcFungHvyanc= golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/go-playground/assert.v1 v1.2.1 h1:xoYuJVE7KT85PYWrN730RguIQO0ePzVRfFMXadIrXTM= gopkg.in/go-playground/assert.v1 v1.2.1/go.mod h1:9RXL0bg/zibRAgZUYszZSwO/z8Y/a8bDuhia5mkpMnE= gopkg.in/go-playground/validator.v9 v9.26.0 h1:2NPPsBpD0ZoxshmLWewQru8rWmbT5JqSzz9D1ZrAjYQ= gopkg.in/go-playground/validator.v9 v9.26.0/go.mod h1:+c9/zcJMFNgbLvly1L1V+PpxWdVbfP1avr/N00E2vyQ= +gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/net/trace/config.go b/pkg/net/trace/config.go index c43e09e71..7741dc147 100644 --- a/pkg/net/trace/config.go +++ b/pkg/net/trace/config.go @@ -58,7 +58,7 @@ func TracerFromEnvFlag() (Tracer, error) { return nil, err } report := newReport(cfg.Network, cfg.Addr, time.Duration(cfg.Timeout), cfg.ProtocolVersion) - return newTracer(env.AppID, report, cfg), nil + return NewTracer(env.AppID, report, cfg.DisableSample), nil } // Init init trace report. @@ -71,5 +71,5 @@ func Init(cfg *Config) { } } report := newReport(cfg.Network, cfg.Addr, time.Duration(cfg.Timeout), cfg.ProtocolVersion) - SetGlobalTracer(newTracer(env.AppID, report, cfg)) + SetGlobalTracer(NewTracer(env.AppID, report, cfg.DisableSample)) } diff --git a/pkg/net/trace/dapper.go b/pkg/net/trace/dapper.go index 5a1675ddf..03048da19 100644 --- a/pkg/net/trace/dapper.go +++ b/pkg/net/trace/dapper.go @@ -8,21 +8,21 @@ import ( ) const ( - _maxLevel = 64 + _maxLevel = 64 + // hard code reset probability at 0.00025, 1/4000 _probability = 0.00025 ) -func newTracer(serviceName string, report reporter, cfg *Config) Tracer { - // hard code reset probability at 0.00025, 1/4000 - cfg.Probability = _probability - sampler := newSampler(cfg.Probability) +// NewTracer new a tracer. +func NewTracer(serviceName string, report reporter, disableSample bool) Tracer { + sampler := newSampler(_probability) // default internal tags tags := extendTag() stdlog := log.New(os.Stderr, "trace", log.LstdFlags) return &dapper{ - cfg: cfg, - serviceName: serviceName, + serviceName: serviceName, + disableSample: disableSample, propagators: map[interface{}]propagator{ HTTPFormat: httpPropagator{}, GRPCFormat: grpcPropagator{}, @@ -30,20 +30,20 @@ func newTracer(serviceName string, report reporter, cfg *Config) Tracer { reporter: report, sampler: sampler, tags: tags, - pool: &sync.Pool{New: func() interface{} { return new(span) }}, + pool: &sync.Pool{New: func() interface{} { return new(Span) }}, stdlog: stdlog, } } type dapper struct { - cfg *Config - serviceName string - tags []Tag - reporter reporter - propagators map[interface{}]propagator - pool *sync.Pool - stdlog *log.Logger - sampler sampler + serviceName string + disableSample bool + tags []Tag + reporter reporter + propagators map[interface{}]propagator + pool *sync.Pool + stdlog *log.Logger + sampler sampler } func (d *dapper) New(operationName string, opts ...Option) Trace { @@ -54,7 +54,7 @@ func (d *dapper) New(operationName string, opts ...Option) Trace { traceID := genID() var sampled bool var probability float32 - if d.cfg.DisableSample { + if d.disableSample { sampled = true probability = 1 } else { @@ -160,7 +160,7 @@ func (d *dapper) Close() error { return d.reporter.Close() } -func (d *dapper) report(sp *span) { +func (d *dapper) report(sp *Span) { if sp.context.isSampled() { if err := d.reporter.WriteSpan(sp); err != nil { d.stdlog.Printf("marshal trace span error: %s", err) @@ -169,7 +169,7 @@ func (d *dapper) report(sp *span) { d.putSpan(sp) } -func (d *dapper) putSpan(sp *span) { +func (d *dapper) putSpan(sp *Span) { if len(sp.tags) > 32 { sp.tags = nil } @@ -179,8 +179,8 @@ func (d *dapper) putSpan(sp *span) { d.pool.Put(sp) } -func (d *dapper) getSpan() *span { - sp := d.pool.Get().(*span) +func (d *dapper) getSpan() *Span { + sp := d.pool.Get().(*Span) sp.dapper = d sp.childs = 0 sp.tags = sp.tags[:0] diff --git a/pkg/net/trace/dapper_test.go b/pkg/net/trace/dapper_test.go index f5e3ab7da..ce9517c7c 100644 --- a/pkg/net/trace/dapper_test.go +++ b/pkg/net/trace/dapper_test.go @@ -10,10 +10,10 @@ import ( ) type mockReport struct { - sps []*span + sps []*Span } -func (m *mockReport) WriteSpan(sp *span) error { +func (m *mockReport) WriteSpan(sp *Span) error { m.sps = append(m.sps, sp) return nil } @@ -25,8 +25,8 @@ func (m *mockReport) Close() error { func TestDapperPropagation(t *testing.T) { t.Run("test HTTP progagation", func(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) - t2 := newTracer("service2", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) + t2 := NewTracer("service2", report, true) sp1 := t1.New("opt_1") sp2 := sp1.Fork("", "opt_client") header := make(http.Header) @@ -49,8 +49,8 @@ func TestDapperPropagation(t *testing.T) { }) t.Run("test gRPC progagation", func(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) - t2 := newTracer("service2", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) + t2 := NewTracer("service2", report, true) sp1 := t1.New("opt_1") sp2 := sp1.Fork("", "opt_client") md := make(metadata.MD) @@ -73,14 +73,14 @@ func TestDapperPropagation(t *testing.T) { }) t.Run("test normal", func(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{Probability: 0.000000001}) + t1 := NewTracer("service1", report, true) sp1 := t1.New("test123") sp1.Finish(nil) }) t.Run("test debug progagation", func(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{}) - t2 := newTracer("service2", report, &Config{}) + t1 := NewTracer("service1", report, true) + t2 := NewTracer("service2", report, true) sp1 := t1.New("opt_1", EnableDebug()) sp2 := sp1.Fork("", "opt_client") header := make(http.Header) @@ -106,7 +106,7 @@ func TestDapperPropagation(t *testing.T) { func BenchmarkSample(b *testing.B) { err := fmt.Errorf("test error") report := &mockReport{} - t1 := newTracer("service1", report, &Config{}) + t1 := NewTracer("service1", report, true) for i := 0; i < b.N; i++ { sp1 := t1.New("test_opt1") sp1.SetTag(TagString("test", "123")) @@ -122,7 +122,7 @@ func BenchmarkSample(b *testing.B) { func BenchmarkDisableSample(b *testing.B) { err := fmt.Errorf("test error") report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) for i := 0; i < b.N; i++ { sp1 := t1.New("test_opt1") sp1.SetTag(TagString("test", "123")) diff --git a/pkg/net/trace/marshal.go b/pkg/net/trace/marshal.go index 6324694e0..2d3f58226 100644 --- a/pkg/net/trace/marshal.go +++ b/pkg/net/trace/marshal.go @@ -20,14 +20,14 @@ var ( errSpanVersion = errs.New("trace: marshal not support version") ) -func marshalSpan(sp *span, version int32) ([]byte, error) { +func marshalSpan(sp *Span, version int32) ([]byte, error) { if version == protoVersion1 { return marshalSpanV1(sp) } return nil, errSpanVersion } -func marshalSpanV1(sp *span) ([]byte, error) { +func marshalSpanV1(sp *Span) ([]byte, error) { protoSpan := new(protogen.Span) protoSpan.Version = protoVersion1 protoSpan.ServiceName = sp.dapper.serviceName diff --git a/pkg/net/trace/marshal_test.go b/pkg/net/trace/marshal_test.go index 30fe50cc9..3c4d5831d 100644 --- a/pkg/net/trace/marshal_test.go +++ b/pkg/net/trace/marshal_test.go @@ -6,8 +6,8 @@ import ( func TestMarshalSpanV1(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) - sp1 := t1.New("opt_test").(*span) + t1 := NewTracer("service1", report, true) + sp1 := t1.New("opt_test").(*Span) sp1.SetLog(Log("hello", "test123")) sp1.SetTag(TagString("tag1", "hell"), TagBool("booltag", true), TagFloat64("float64tag", 3.14159)) sp1.Finish(nil) diff --git a/pkg/net/trace/report.go b/pkg/net/trace/report.go index e9b8e4122..6bce7ea28 100644 --- a/pkg/net/trace/report.go +++ b/pkg/net/trace/report.go @@ -20,7 +20,7 @@ const ( // reporter trace reporter. type reporter interface { - WriteSpan(sp *span) error + WriteSpan(sp *Span) error Close() error } @@ -64,7 +64,7 @@ func (c *connReport) daemon() { c.done <- struct{}{} } -func (c *connReport) WriteSpan(sp *span) error { +func (c *connReport) WriteSpan(sp *Span) error { data, err := marshalSpan(sp, c.version) if err != nil { return err diff --git a/pkg/net/trace/span.go b/pkg/net/trace/span.go index 74d9eb756..f943f84a7 100644 --- a/pkg/net/trace/span.go +++ b/pkg/net/trace/span.go @@ -13,9 +13,10 @@ const ( _maxLogs = 256 ) -var _ Trace = &span{} +var _ Trace = &Span{} -type span struct { +// Span is a trace span. +type Span struct { dapper *dapper context spanContext operationName string @@ -26,11 +27,39 @@ type span struct { childs int } -func (s *span) TraceID() string { +func (s *Span) Name() string { + return s.operationName +} + +func (s *Span) StartTime() time.Time { + return s.startTime +} + +func (s *Span) Duration() time.Duration { + return s.duration +} + +func (s *Span) TraceID() string { return s.context.String() } -func (s *span) Fork(serviceName, operationName string) Trace { +func (s *Span) Tid() uint64 { + return s.context.traceID +} + +func (s *Span) SpanID() uint64 { + return s.context.spanID +} + +func (s *Span) ParentID() uint64 { + return s.context.parentID +} + +func (s *Span) Tags() []Tag { + return s.tags +} + +func (s *Span) Fork(serviceName, operationName string) Trace { if s.childs > _maxChilds { // if child span more than max childs set return noopspan return noopspan{} @@ -40,11 +69,11 @@ func (s *span) Fork(serviceName, operationName string) Trace { return s.dapper.newSpanWithContext(operationName, s.context).SetTag(TagString(TagSpanKind, "client")) } -func (s *span) Follow(serviceName, operationName string) Trace { +func (s *Span) Follow(serviceName, operationName string) Trace { return s.Fork(serviceName, operationName).SetTag(TagString(TagSpanKind, "producer")) } -func (s *span) Finish(perr *error) { +func (s *Span) Finish(perr *error) { s.duration = time.Since(s.startTime) if perr != nil && *perr != nil { err := *perr @@ -57,7 +86,7 @@ func (s *span) Finish(perr *error) { s.dapper.report(s) } -func (s *span) SetTag(tags ...Tag) Trace { +func (s *Span) SetTag(tags ...Tag) Trace { if !s.context.isSampled() && !s.context.isDebug() { return s } @@ -72,7 +101,7 @@ func (s *span) SetTag(tags ...Tag) Trace { // LogFields is an efficient and type-checked way to record key:value // NOTE current unsupport -func (s *span) SetLog(logs ...LogField) Trace { +func (s *Span) SetLog(logs ...LogField) Trace { if !s.context.isSampled() && !s.context.isDebug() { return s } @@ -85,7 +114,7 @@ func (s *span) SetLog(logs ...LogField) Trace { return s } -func (s *span) setLog(logs ...LogField) Trace { +func (s *Span) setLog(logs ...LogField) Trace { protoLog := &protogen.Log{ Timestamp: time.Now().UnixNano(), Fields: make([]*protogen.Field, len(logs)), @@ -98,15 +127,15 @@ func (s *span) setLog(logs ...LogField) Trace { } // Visit visits the k-v pair in trace, calling fn for each. -func (s *span) Visit(fn func(k, v string)) { +func (s *Span) Visit(fn func(k, v string)) { fn(KratosTraceID, s.context.String()) } // SetTitle reset trace title -func (s *span) SetTitle(operationName string) { +func (s *Span) SetTitle(operationName string) { s.operationName = operationName } -func (s *span) String() string { +func (s *Span) String() string { return s.context.String() } diff --git a/pkg/net/trace/span_test.go b/pkg/net/trace/span_test.go index 17a3cef4c..d91c6b358 100644 --- a/pkg/net/trace/span_test.go +++ b/pkg/net/trace/span_test.go @@ -12,14 +12,14 @@ import ( func TestSpan(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) t.Run("test span string", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) assert.NotEmpty(t, fmt.Sprint(sp1)) }) t.Run("test fork", func(t *testing.T) { - sp1 := t1.New("testfork").(*span) - sp2 := sp1.Fork("xxx", "opt_2").(*span) + sp1 := t1.New("testfork").(*Span) + sp2 := sp1.Fork("xxx", "opt_2").(*Span) assert.Equal(t, sp1.context.traceID, sp2.context.traceID) assert.Equal(t, sp1.context.spanID, sp2.context.parentID) t.Run("test max fork", func(t *testing.T) { @@ -39,14 +39,14 @@ func TestSpan(t *testing.T) { }) t.Run("test finish", func(t *testing.T) { t.Run("test finish ok", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) time.Sleep(time.Millisecond) sp1.Finish(nil) assert.True(t, sp1.startTime.Unix() > 0) assert.True(t, sp1.duration > time.Microsecond) }) t.Run("test finish error", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) time.Sleep(time.Millisecond) err := fmt.Errorf("🍻") sp1.Finish(&err) @@ -71,7 +71,7 @@ func TestSpan(t *testing.T) { assert.True(t, messageLog) }) t.Run("test finish error stack", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) time.Sleep(time.Millisecond) err := fmt.Errorf("🍻") err = errors.WithStack(err) @@ -87,7 +87,7 @@ func TestSpan(t *testing.T) { assert.True(t, ok, "LogStack set") }) t.Run("test too many tags", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) for i := 0; i < 1024; i++ { sp1.SetTag(Tag{Key: strconv.Itoa(i), Value: "hello"}) } @@ -96,7 +96,7 @@ func TestSpan(t *testing.T) { assert.Equal(t, sp1.tags[_maxTags].Value, "too many tags") }) t.Run("test too many logs", func(t *testing.T) { - sp1 := t1.New("testfinish").(*span) + sp1 := t1.New("testfinish").(*Span) for i := 0; i < 1024; i++ { sp1.SetLog(LogField{Key: strconv.Itoa(i), Value: "hello"}) } diff --git a/pkg/net/trace/util_test.go b/pkg/net/trace/util_test.go index c6223bd84..6d98908a1 100644 --- a/pkg/net/trace/util_test.go +++ b/pkg/net/trace/util_test.go @@ -9,7 +9,7 @@ import ( func TestFromContext(t *testing.T) { report := &mockReport{} - t1 := newTracer("service1", report, &Config{DisableSample: true}) + t1 := NewTracer("service1", report, true) sp1 := t1.New("test123") ctx := context.Background() ctx = NewContext(ctx, sp1) diff --git a/pkg/net/trace/zipkin/config.go b/pkg/net/trace/zipkin/config.go new file mode 100644 index 000000000..37e3037e8 --- /dev/null +++ b/pkg/net/trace/zipkin/config.go @@ -0,0 +1,30 @@ +package zipkin + +import ( + "time" + + "github.com/bilibili/kratos/pkg/conf/env" + "github.com/bilibili/kratos/pkg/net/trace" + xtime "github.com/bilibili/kratos/pkg/time" +) + +// Config config. +// url should be the endpoint to send the spans to, e.g. +// http://localhost:9411/api/v2/spans +type Config struct { + Endpoint string `dsn:"endpoint"` + BatchSize int `dsn:"query.batch_size,100"` + Timeout xtime.Duration `dsn:"query.timeout,200ms"` + DisableSample bool `dsn:"query.disable_sample"` +} + +// Init init trace report. +func Init(c *Config) { + if c.BatchSize == 0 { + c.BatchSize = 100 + } + if c.Timeout == 0 { + c.Timeout = xtime.Duration(200 * time.Millisecond) + } + trace.SetGlobalTracer(trace.NewTracer(env.AppID, newReport(c), c.DisableSample)) +} diff --git a/pkg/net/trace/zipkin/zipkin.go b/pkg/net/trace/zipkin/zipkin.go new file mode 100644 index 000000000..8cd1532b0 --- /dev/null +++ b/pkg/net/trace/zipkin/zipkin.go @@ -0,0 +1,64 @@ +package zipkin + +import ( + "fmt" + "time" + + "github.com/bilibili/kratos/pkg/net/trace" + "github.com/openzipkin/zipkin-go/model" + "github.com/openzipkin/zipkin-go/reporter" + "github.com/openzipkin/zipkin-go/reporter/http" +) + +type report struct { + rpt reporter.Reporter +} + +func newReport(c *Config) *report { + return &report{ + rpt: http.NewReporter(c.Endpoint, + http.Timeout(time.Duration(c.Timeout)), + http.BatchSize(c.BatchSize), + ), + } +} + +// WriteSpan write a trace span to queue. +func (r *report) WriteSpan(raw *trace.Span) (err error) { + traceID := model.TraceID{Low: raw.Tid()} + spanID := model.ID(raw.SpanID()) + parentID := model.ID(raw.ParentID()) + span := model.SpanModel{ + SpanContext: model.SpanContext{ + TraceID: traceID, + ID: spanID, + ParentID: &parentID, + }, + Name: raw.Name(), + Timestamp: raw.StartTime(), + Duration: raw.Duration(), + Tags: make(map[string]string, len(raw.Tags())), + } + for _, tag := range raw.Tags() { + switch tag.Key { + case trace.TagSpanKind: + span.Kind = model.Kind(tag.Value.(string)) + case trace.TagPeerService: + span.LocalEndpoint = &model.Endpoint{ServiceName: tag.Value.(string)} + default: + v, ok := tag.Value.(string) + if ok { + span.Tags[tag.Key] = v + } else { + span.Tags[tag.Key] = fmt.Sprint(v) + } + } + } + r.rpt.Send(span) + return +} + +// Close close the report. +func (r *report) Close() error { + return r.rpt.Close() +} diff --git a/pkg/net/trace/zipkin/zipkin_test.go b/pkg/net/trace/zipkin/zipkin_test.go new file mode 100644 index 000000000..34435b5d5 --- /dev/null +++ b/pkg/net/trace/zipkin/zipkin_test.go @@ -0,0 +1,33 @@ +package zipkin + +import ( + "net/http" + "testing" + "time" + + "github.com/bilibili/kratos/pkg/net/trace" + xtime "github.com/bilibili/kratos/pkg/time" +) + +func TestZipkin(t *testing.T) { + c := &Config{ + Endpoint: "http://127.0.0.1:9411/api/v2/spans", + Timeout: xtime.Duration(time.Second * 5), + BatchSize: 100, + } + report := newReport(c) + t1 := trace.NewTracer("service1", report, true) + t2 := trace.NewTracer("service2", report, true) + sp1 := t1.New("opt_1") + sp2 := sp1.Fork("", "opt_client") + header := make(http.Header) + t1.Inject(sp2, trace.HTTPFormat, header) + sp3, err := t2.Extract(trace.HTTPFormat, header) + if err != nil { + t.Fatal(err) + } + sp3.Finish(nil) + sp2.Finish(nil) + sp1.Finish(nil) + report.Close() +}