add metrics to databusutil and pipeline

pull/306/head
felixhao 5 years ago
parent 918eed4df6
commit a48d4c46fe
  1. 9
      pkg/sync/pipeline/CHANGELOG.md
  2. 6
      pkg/sync/pipeline/fanout/CHANGELOG.md
  3. 13
      pkg/sync/pipeline/fanout/fanout.go
  4. 22
      pkg/sync/pipeline/fanout/metrics.go
  5. 44
      pkg/sync/pipeline/pipeline.go

@ -0,0 +1,9 @@
### pipeline
#### Version 1.2.0
> 1. 默认为平滑触发事件
> 2. 增加metric上报
#### Version 1.1.0
> 1. 增加平滑时间的支持
#### Version 1.0.0
> 1. 提供聚合方法 内部区分压测流量

@ -0,0 +1,6 @@
### pipeline/fanout
#### Version 1.1.0
> 1. 增加处理速度metric上报
#### Version 1.0.0
> 1. library/cache包改为fanout

@ -6,17 +6,17 @@ import (
"runtime"
"sync"
"github.com/bilibili/kratos/pkg/log"
"github.com/bilibili/kratos/pkg/net/metadata"
"github.com/bilibili/kratos/pkg/net/trace"
"go-common/library/log"
"go-common/library/net/metadata"
"go-common/library/net/trace"
)
var (
// ErrFull chan full.
ErrFull = errors.New("fanout: chan full")
traceTags = []trace.Tag{
trace.Tag{Key: trace.TagSpanKind, Value: "background"},
trace.Tag{Key: trace.TagComponent, Value: "sync/pipeline/fanout"},
{Key: trace.TagSpanKind, Value: "background"},
{Key: trace.TagComponent, Value: "sync/pipeline/fanout"},
}
)
@ -67,7 +67,7 @@ type Fanout struct {
// New new a fanout struct.
func New(name string, opts ...Option) *Fanout {
if name == "" {
name = "fanout"
name = "anonymous"
}
o := &options{
worker: 1,
@ -96,6 +96,7 @@ func (c *Fanout) proc() {
case t := <-c.ch:
wrapFunc(t.f)(t.ctx)
_metricChanSize.Set(float64(len(c.ch)), c.name)
_metricCount.Inc(c.name)
case <-c.ctx.Done():
return
}

@ -1,15 +1,27 @@
package fanout
import "github.com/bilibili/kratos/pkg/stat/metric"
import (
"go-common/library/stat/metric"
)
const namespace = "sync"
const (
_metricNamespace = "sync"
_metricSubSystem = "pipeline_fanout"
)
var (
_metricChanSize = metric.NewGaugeVec(&metric.GaugeVecOpts{
Namespace: namespace,
Subsystem: "pipeline_fanout",
Name: "current",
Namespace: _metricNamespace,
Subsystem: _metricSubSystem,
Name: "chan_len",
Help: "sync pipeline fanout current channel size.",
Labels: []string{"name"},
})
_metricCount = metric.NewCounterVec(&metric.CounterVecOpts{
Namespace: _metricNamespace,
Subsystem: _metricSubSystem,
Name: "process_count",
Help: "process count",
Labels: []string{"name"},
})
)

@ -3,16 +3,38 @@ package pipeline
import (
"context"
"errors"
"strconv"
"sync"
"time"
"github.com/bilibili/kratos/pkg/net/metadata"
xtime "github.com/bilibili/kratos/pkg/time"
"go-common/library/net/metadata"
"go-common/library/stat/metric"
xtime "go-common/library/time"
)
// ErrFull channel full error
var ErrFull = errors.New("channel full")
const _metricNamespace = "sync"
const _metricSubSystem = "pipeline"
var (
_metricCount = metric.NewCounterVec(&metric.CounterVecOpts{
Namespace: _metricNamespace,
Subsystem: _metricSubSystem,
Name: "process_count",
Help: "process count",
Labels: []string{"name", "chan"},
})
_metricChanLen = metric.NewGaugeVec(&metric.GaugeVecOpts{
Namespace: _metricNamespace,
Subsystem: _metricSubSystem,
Name: "chan_len",
Help: "channel length",
Labels: []string{"name", "chan"},
})
)
type message struct {
key string
value interface{}
@ -26,6 +48,7 @@ type Pipeline struct {
mirrorChans []chan *message
config *Config
wait sync.WaitGroup
name string
}
// Config Pipeline config
@ -38,8 +61,8 @@ type Config struct {
Buffer int
// Worker channel number
Worker int
// Smooth smoothing interval
Smooth bool
// Name use for metrics
Name string
}
func (c *Config) fix() {
@ -55,6 +78,9 @@ func (c *Config) fix() {
if c.Worker <= 0 {
c.Worker = 10
}
if c.Name == "" {
c.Name = "anonymous"
}
}
// NewPipeline new pipline
@ -67,6 +93,7 @@ func NewPipeline(config *Config) (res *Pipeline) {
chans: make([]chan *message, config.Worker),
mirrorChans: make([]chan *message, config.Worker),
config: config,
name: config.Name,
}
for i := 0; i < config.Worker; i++ {
res.chans[i] = make(chan *message, config.Buffer)
@ -144,7 +171,7 @@ func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) {
inteval = p.config.Interval
oldTicker = true
)
if p.config.Smooth && index > 0 {
if index > 0 {
inteval = xtime.Duration(int64(index) * (int64(p.config.Interval) / int64(p.config.Worker)))
}
ticker := time.NewTicker(time.Duration(inteval))
@ -162,21 +189,26 @@ func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) {
}
continue
case <-ticker.C:
if p.config.Smooth && oldTicker {
if oldTicker {
ticker.Stop()
ticker = time.NewTicker(time.Duration(p.config.Interval))
oldTicker = false
}
}
name := p.name
process := count
if len(vals) > 0 {
ctx := context.Background()
if mirror {
ctx = metadata.NewContext(ctx, metadata.MD{metadata.Mirror: "1"})
name = "mirror_" + name
}
p.Do(ctx, index, vals)
vals = make(map[string][]interface{}, p.config.MaxSize)
count = 0
}
_metricChanLen.Set(float64(len(ch)), name, strconv.Itoa(index))
_metricCount.Add(float64(process), name, strconv.Itoa(index))
if closed {
ticker.Stop()
return

Loading…
Cancel
Save