diff --git a/pkg/sync/pipeline/CHANGELOG.md b/pkg/sync/pipeline/CHANGELOG.md new file mode 100755 index 000000000..2120a2356 --- /dev/null +++ b/pkg/sync/pipeline/CHANGELOG.md @@ -0,0 +1,9 @@ +### pipeline + +#### Version 1.2.0 +> 1. 默认为平滑触发事件 +> 2. 增加metric上报 +#### Version 1.1.0 +> 1. 增加平滑时间的支持 +#### Version 1.0.0 +> 1. 提供聚合方法 内部区分压测流量 diff --git a/pkg/sync/pipeline/fanout/CHANGELOG.md b/pkg/sync/pipeline/fanout/CHANGELOG.md new file mode 100755 index 000000000..c289a6425 --- /dev/null +++ b/pkg/sync/pipeline/fanout/CHANGELOG.md @@ -0,0 +1,6 @@ +### pipeline/fanout + +#### Version 1.1.0 +> 1. 增加处理速度metric上报 +#### Version 1.0.0 +> 1. library/cache包改为fanout diff --git a/pkg/sync/pipeline/fanout/fanout.go b/pkg/sync/pipeline/fanout/fanout.go index 79ac0da8c..c17e380ea 100644 --- a/pkg/sync/pipeline/fanout/fanout.go +++ b/pkg/sync/pipeline/fanout/fanout.go @@ -6,17 +6,17 @@ import ( "runtime" "sync" - "github.com/bilibili/kratos/pkg/log" - "github.com/bilibili/kratos/pkg/net/metadata" - "github.com/bilibili/kratos/pkg/net/trace" + "go-common/library/log" + "go-common/library/net/metadata" + "go-common/library/net/trace" ) var ( // ErrFull chan full. ErrFull = errors.New("fanout: chan full") traceTags = []trace.Tag{ - trace.Tag{Key: trace.TagSpanKind, Value: "background"}, - trace.Tag{Key: trace.TagComponent, Value: "sync/pipeline/fanout"}, + {Key: trace.TagSpanKind, Value: "background"}, + {Key: trace.TagComponent, Value: "sync/pipeline/fanout"}, } ) @@ -67,7 +67,7 @@ type Fanout struct { // New new a fanout struct. func New(name string, opts ...Option) *Fanout { if name == "" { - name = "fanout" + name = "anonymous" } o := &options{ worker: 1, @@ -96,6 +96,7 @@ func (c *Fanout) proc() { case t := <-c.ch: wrapFunc(t.f)(t.ctx) _metricChanSize.Set(float64(len(c.ch)), c.name) + _metricCount.Inc(c.name) case <-c.ctx.Done(): return } diff --git a/pkg/sync/pipeline/fanout/metrics.go b/pkg/sync/pipeline/fanout/metrics.go index 0014b7f35..42f1868d7 100644 --- a/pkg/sync/pipeline/fanout/metrics.go +++ b/pkg/sync/pipeline/fanout/metrics.go @@ -1,15 +1,27 @@ package fanout -import "github.com/bilibili/kratos/pkg/stat/metric" +import ( + "go-common/library/stat/metric" +) -const namespace = "sync" +const ( + _metricNamespace = "sync" + _metricSubSystem = "pipeline_fanout" +) var ( _metricChanSize = metric.NewGaugeVec(&metric.GaugeVecOpts{ - Namespace: namespace, - Subsystem: "pipeline_fanout", - Name: "current", + Namespace: _metricNamespace, + Subsystem: _metricSubSystem, + Name: "chan_len", Help: "sync pipeline fanout current channel size.", Labels: []string{"name"}, }) + _metricCount = metric.NewCounterVec(&metric.CounterVecOpts{ + Namespace: _metricNamespace, + Subsystem: _metricSubSystem, + Name: "process_count", + Help: "process count", + Labels: []string{"name"}, + }) ) diff --git a/pkg/sync/pipeline/pipeline.go b/pkg/sync/pipeline/pipeline.go index ef9263cb9..85fb7f8bb 100644 --- a/pkg/sync/pipeline/pipeline.go +++ b/pkg/sync/pipeline/pipeline.go @@ -3,16 +3,38 @@ package pipeline import ( "context" "errors" + "strconv" "sync" "time" - "github.com/bilibili/kratos/pkg/net/metadata" - xtime "github.com/bilibili/kratos/pkg/time" + "go-common/library/net/metadata" + "go-common/library/stat/metric" + xtime "go-common/library/time" ) // ErrFull channel full error var ErrFull = errors.New("channel full") +const _metricNamespace = "sync" +const _metricSubSystem = "pipeline" + +var ( + _metricCount = metric.NewCounterVec(&metric.CounterVecOpts{ + Namespace: _metricNamespace, + Subsystem: _metricSubSystem, + Name: "process_count", + Help: "process count", + Labels: []string{"name", "chan"}, + }) + _metricChanLen = metric.NewGaugeVec(&metric.GaugeVecOpts{ + Namespace: _metricNamespace, + Subsystem: _metricSubSystem, + Name: "chan_len", + Help: "channel length", + Labels: []string{"name", "chan"}, + }) +) + type message struct { key string value interface{} @@ -26,6 +48,7 @@ type Pipeline struct { mirrorChans []chan *message config *Config wait sync.WaitGroup + name string } // Config Pipeline config @@ -38,8 +61,8 @@ type Config struct { Buffer int // Worker channel number Worker int - // Smooth smoothing interval - Smooth bool + // Name use for metrics + Name string } func (c *Config) fix() { @@ -55,6 +78,9 @@ func (c *Config) fix() { if c.Worker <= 0 { c.Worker = 10 } + if c.Name == "" { + c.Name = "anonymous" + } } // NewPipeline new pipline @@ -67,6 +93,7 @@ func NewPipeline(config *Config) (res *Pipeline) { chans: make([]chan *message, config.Worker), mirrorChans: make([]chan *message, config.Worker), config: config, + name: config.Name, } for i := 0; i < config.Worker; i++ { res.chans[i] = make(chan *message, config.Buffer) @@ -144,7 +171,7 @@ func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) { inteval = p.config.Interval oldTicker = true ) - if p.config.Smooth && index > 0 { + if index > 0 { inteval = xtime.Duration(int64(index) * (int64(p.config.Interval) / int64(p.config.Worker))) } ticker := time.NewTicker(time.Duration(inteval)) @@ -162,21 +189,26 @@ func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) { } continue case <-ticker.C: - if p.config.Smooth && oldTicker { + if oldTicker { ticker.Stop() ticker = time.NewTicker(time.Duration(p.config.Interval)) oldTicker = false } } + name := p.name + process := count if len(vals) > 0 { ctx := context.Background() if mirror { ctx = metadata.NewContext(ctx, metadata.MD{metadata.Mirror: "1"}) + name = "mirror_" + name } p.Do(ctx, index, vals) vals = make(map[string][]interface{}, p.config.MaxSize) count = 0 } + _metricChanLen.Set(float64(len(ch)), name, strconv.Itoa(index)) + _metricCount.Add(float64(process), name, strconv.Itoa(index)) if closed { ticker.Stop() return