|
|
|
@ -174,12 +174,13 @@ func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) { |
|
|
|
|
closed bool |
|
|
|
|
count int |
|
|
|
|
inteval = p.config.Interval |
|
|
|
|
oldTicker = true |
|
|
|
|
timeout = false |
|
|
|
|
) |
|
|
|
|
if index > 0 { |
|
|
|
|
inteval = xtime.Duration(int64(index) * (int64(p.config.Interval) / int64(p.config.Worker))) |
|
|
|
|
} |
|
|
|
|
ticker := time.NewTicker(time.Duration(inteval)) |
|
|
|
|
timer := time.NewTimer(time.Duration(inteval)) |
|
|
|
|
defer timer.Stop() |
|
|
|
|
for { |
|
|
|
|
select { |
|
|
|
|
case m = <-ch: |
|
|
|
@ -193,12 +194,8 @@ func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
continue |
|
|
|
|
case <-ticker.C: |
|
|
|
|
if oldTicker { |
|
|
|
|
ticker.Stop() |
|
|
|
|
ticker = time.NewTicker(time.Duration(p.config.Interval)) |
|
|
|
|
oldTicker = false |
|
|
|
|
} |
|
|
|
|
case <-timer.C: |
|
|
|
|
timeout = true |
|
|
|
|
} |
|
|
|
|
name := p.name |
|
|
|
|
process := count |
|
|
|
@ -215,8 +212,12 @@ func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) { |
|
|
|
|
_metricChanLen.Set(float64(len(ch)), name, strconv.Itoa(index)) |
|
|
|
|
_metricCount.Add(float64(process), name, strconv.Itoa(index)) |
|
|
|
|
if closed { |
|
|
|
|
ticker.Stop() |
|
|
|
|
return |
|
|
|
|
} |
|
|
|
|
if !timer.Stop() && !timeout { |
|
|
|
|
<-timer.C |
|
|
|
|
timeout = false |
|
|
|
|
} |
|
|
|
|
timer.Reset(time.Duration(p.config.Interval)) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|