opt last append time of rolling policy (#430)

* opt last append time of rolling policy

* add test of rolling policy add

* modify test cases
pull/438/head
mikellxy 5 years ago committed by Tony
parent 60e2765549
commit 75348b05db
  1. 7
      pkg/stat/metric/rolling_policy.go
  2. 67
      pkg/stat/metric/rolling_policy_test.go

@ -38,7 +38,7 @@ func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy {
func (r *RollingPolicy) timespan() int { func (r *RollingPolicy) timespan() int {
v := int(time.Since(r.lastAppendTime) / r.bucketDuration) v := int(time.Since(r.lastAppendTime) / r.bucketDuration)
if v < r.size && v > -1 { // maybe time backwards if v > -1 { // maybe time backwards
return v return v
} }
return r.size return r.size
@ -48,9 +48,13 @@ func (r *RollingPolicy) add(f func(offset int, val float64), val float64) {
r.mu.Lock() r.mu.Lock()
timespan := r.timespan() timespan := r.timespan()
if timespan > 0 { if timespan > 0 {
r.lastAppendTime = r.lastAppendTime.Add(time.Duration(timespan * int(r.bucketDuration)))
offset := r.offset offset := r.offset
// reset the expired buckets // reset the expired buckets
s := offset + 1 s := offset + 1
if timespan > r.size {
timespan = r.size
}
e, e1 := s+timespan, 0 // e: reset offset must start from offset+1 e, e1 := s+timespan, 0 // e: reset offset must start from offset+1
if e > r.size { if e > r.size {
e1 = e - r.size e1 = e - r.size
@ -65,7 +69,6 @@ func (r *RollingPolicy) add(f func(offset int, val float64), val float64) {
offset = i offset = i
} }
r.offset = offset r.offset = offset
r.lastAppendTime = time.Now()
} }
f(r.offset, val) f(r.offset, val)
r.mu.Unlock() r.mu.Unlock()

@ -0,0 +1,67 @@
package metric
import (
"fmt"
"math/rand"
"testing"
"time"
)
func GetRollingPolicy() *RollingPolicy {
w := NewWindow(WindowOpts{Size: 10})
return NewRollingPolicy(w, RollingPolicyOpts{BucketDuration: 300 * time.Millisecond})
}
func Handler(t *testing.T, table []map[string][]int) {
for _, hm := range table {
var totalTs, lastOffset int
offsetAndPoints := hm["offsetAndPoints"]
timeSleep := hm["timeSleep"]
policy := GetRollingPolicy()
for i, n := range timeSleep {
totalTs += n
time.Sleep(time.Duration(n) * time.Millisecond)
policy.Add(1)
offset, points := offsetAndPoints[2*i], offsetAndPoints[2*i+1]
if int(policy.window.window[offset].Points[0]) != points {
t.Errorf("error, time since last append: %vms, last offset: %v", totalTs, lastOffset)
}
lastOffset = offset
}
}
}
func TestRollingPolicy_Add(t *testing.T) {
rand.Seed(time.Now().Unix())
// test add after 400ms and 601ms relative to the policy created time
policy := GetRollingPolicy()
time.Sleep(400 * time.Millisecond)
policy.Add(1)
time.Sleep(201 * time.Millisecond)
policy.Add(1)
for _, b := range policy.window.window {
fmt.Println(b.Points)
}
if int(policy.window.window[1].Points[0]) != 1 {
t.Errorf("error, time since last append: %vms, last offset: %v", 300, 0)
}
if int(policy.window.window[2].Points[0]) != 1 {
t.Errorf("error, time since last append: %vms, last offset: %v", 301, 0)
}
// test func timespan return real span
table := []map[string][]int{
{
"timeSleep": []int{294, 3200},
"offsetAndPoints": []int{0, 1, 0, 1},
},
{
"timeSleep": []int{305, 3200, 6400},
"offsetAndPoints": []int{1, 1, 1, 1, 1, 1},
},
}
Handler(t, table)
}
Loading…
Cancel
Save