You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
98 lines
2.3 KiB
98 lines
2.3 KiB
6 years ago
|
package metric
|
||
|
|
||
|
import (
|
||
|
"sync"
|
||
|
"time"
|
||
|
)
|
||
|
|
||
|
// RollingPolicy is a policy for ring window based on time duration.
|
||
|
// RollingPolicy moves bucket offset with time duration.
|
||
|
// e.g. If the last point is appended one bucket duration ago,
|
||
|
// RollingPolicy will increment current offset.
|
||
|
type RollingPolicy struct {
|
||
|
mu sync.RWMutex
|
||
|
size int
|
||
|
window *Window
|
||
|
offset int
|
||
|
|
||
|
bucketDuration time.Duration
|
||
|
lastAppendTime time.Time
|
||
|
}
|
||
|
|
||
|
// RollingPolicyOpts contains the arguments for creating RollingPolicy.
|
||
|
type RollingPolicyOpts struct {
|
||
|
BucketDuration time.Duration
|
||
|
}
|
||
|
|
||
|
// NewRollingPolicy creates a new RollingPolicy based on the given window and RollingPolicyOpts.
|
||
|
func NewRollingPolicy(window *Window, opts RollingPolicyOpts) *RollingPolicy {
|
||
|
return &RollingPolicy{
|
||
|
window: window,
|
||
|
size: window.Size(),
|
||
|
offset: 0,
|
||
|
|
||
|
bucketDuration: opts.BucketDuration,
|
||
|
lastAppendTime: time.Now(),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (r *RollingPolicy) timespan() int {
|
||
|
v := int(time.Since(r.lastAppendTime) / r.bucketDuration)
|
||
|
if v < r.size && v > -1 { // maybe time backwards
|
||
|
return v
|
||
|
}
|
||
|
return r.size
|
||
|
}
|
||
|
|
||
|
func (r *RollingPolicy) add(f func(offset int, val float64), val float64) {
|
||
|
r.mu.Lock()
|
||
|
timespan := r.timespan()
|
||
|
if timespan > 0 {
|
||
|
offset := r.offset
|
||
|
// reset the expired buckets
|
||
|
s := offset + 1
|
||
|
e, e1 := s+timespan, 0 // e: reset offset must start from offset+1
|
||
|
if e > r.size {
|
||
|
e1 = e - r.size
|
||
|
e = r.size
|
||
|
}
|
||
|
for i := s; i < e; i++ {
|
||
|
r.window.ResetBucket(i)
|
||
|
offset = i
|
||
|
}
|
||
|
for i := 0; i < e1; i++ {
|
||
|
r.window.ResetBucket(i)
|
||
|
offset = i
|
||
|
}
|
||
|
r.offset = offset
|
||
|
r.lastAppendTime = time.Now()
|
||
|
}
|
||
|
f(r.offset, val)
|
||
|
r.mu.Unlock()
|
||
|
}
|
||
|
|
||
|
// Append appends the given points to the window.
|
||
|
func (r *RollingPolicy) Append(val float64) {
|
||
|
r.add(r.window.Append, val)
|
||
|
}
|
||
|
|
||
|
// Add adds the given value to the latest point within bucket.
|
||
|
func (r *RollingPolicy) Add(val float64) {
|
||
|
r.add(r.window.Add, val)
|
||
|
}
|
||
|
|
||
|
// Reduce applies the reduction function to all buckets within the window.
|
||
|
func (r *RollingPolicy) Reduce(f func(Iterator) float64) (val float64) {
|
||
|
r.mu.RLock()
|
||
|
timespan := r.timespan()
|
||
|
if count := r.size - timespan; count > 0 {
|
||
|
offset := r.offset + timespan + 1
|
||
|
if offset >= r.size {
|
||
|
offset = offset - r.size
|
||
|
}
|
||
|
val = f(r.window.Iterator(offset, count))
|
||
|
}
|
||
|
r.mu.RUnlock()
|
||
|
return val
|
||
|
}
|