From f2c6751ed432c39f6cb2ccdb28e277f139041d80 Mon Sep 17 00:00:00 2001 From: Astone Date: Sun, 29 Mar 2020 11:47:55 +0800 Subject: [PATCH] sync/pipeline: using timer replace ticker (#477) --- pkg/sync/pipeline/pipeline.go | 29 +++++++++++++++-------------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/pkg/sync/pipeline/pipeline.go b/pkg/sync/pipeline/pipeline.go index 22221c6f6..7347e5f6c 100644 --- a/pkg/sync/pipeline/pipeline.go +++ b/pkg/sync/pipeline/pipeline.go @@ -169,17 +169,18 @@ func (p *Pipeline) Close() (err error) { func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) { defer p.wait.Done() var ( - m *message - vals = make(map[string][]interface{}, p.config.MaxSize) - closed bool - count int - inteval = p.config.Interval - oldTicker = true + m *message + vals = make(map[string][]interface{}, p.config.MaxSize) + closed bool + count int + inteval = p.config.Interval + timeout = false ) if index > 0 { inteval = xtime.Duration(int64(index) * (int64(p.config.Interval) / int64(p.config.Worker))) } - ticker := time.NewTicker(time.Duration(inteval)) + timer := time.NewTimer(time.Duration(inteval)) + defer timer.Stop() for { select { case m = <-ch: @@ -193,12 +194,8 @@ func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) { break } continue - case <-ticker.C: - if oldTicker { - ticker.Stop() - ticker = time.NewTicker(time.Duration(p.config.Interval)) - oldTicker = false - } + case <-timer.C: + timeout = true } name := p.name process := count @@ -215,8 +212,12 @@ func (p *Pipeline) mergeproc(mirror bool, index int, ch <-chan *message) { _metricChanLen.Set(float64(len(ch)), name, strconv.Itoa(index)) _metricCount.Add(float64(process), name, strconv.Itoa(index)) if closed { - ticker.Stop() return } + if !timer.Stop() && !timeout { + <-timer.C + timeout = false + } + timer.Reset(time.Duration(p.config.Interval)) } }