You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
186 lines
3.5 KiB
186 lines
3.5 KiB
6 years ago
|
package aqm
|
||
|
|
||
|
import (
|
||
|
"context"
|
||
|
"math"
|
||
|
"sync"
|
||
|
"time"
|
||
|
|
||
|
"github.com/bilibili/Kratos/pkg/ecode"
|
||
|
)
|
||
|
|
||
|
// Config codel config.
|
||
|
type Config struct {
|
||
|
Target int64 // target queue delay (default 20 ms).
|
||
|
Internal int64 // sliding minimum time window width (default 500 ms)
|
||
|
}
|
||
|
|
||
|
// Stat is the Statistics of codel.
|
||
|
type Stat struct {
|
||
|
Dropping bool
|
||
|
FaTime int64
|
||
|
DropNext int64
|
||
|
Packets int
|
||
|
}
|
||
|
|
||
|
type packet struct {
|
||
|
ch chan bool
|
||
|
ts int64
|
||
|
}
|
||
|
|
||
|
var defaultConf = &Config{
|
||
|
Target: 50,
|
||
|
Internal: 500,
|
||
|
}
|
||
|
|
||
|
// Queue queue is CoDel req buffer queue.
|
||
|
type Queue struct {
|
||
|
pool sync.Pool
|
||
|
packets chan packet
|
||
|
|
||
|
mux sync.RWMutex
|
||
|
conf *Config
|
||
|
count int64
|
||
|
dropping bool // Equal to 1 if in drop state
|
||
|
faTime int64 // Time when we'll declare we're above target (0 if below)
|
||
|
dropNext int64 // Packets dropped since going into drop state
|
||
|
}
|
||
|
|
||
|
// Default new a default codel queue.
|
||
|
func Default() *Queue {
|
||
|
return New(defaultConf)
|
||
|
}
|
||
|
|
||
|
// New new codel queue.
|
||
|
func New(conf *Config) *Queue {
|
||
|
if conf == nil {
|
||
|
conf = defaultConf
|
||
|
}
|
||
|
q := &Queue{
|
||
|
packets: make(chan packet, 2048),
|
||
|
conf: conf,
|
||
|
}
|
||
|
q.pool.New = func() interface{} {
|
||
|
return make(chan bool)
|
||
|
}
|
||
|
return q
|
||
|
}
|
||
|
|
||
|
// Reload set queue config.
|
||
|
func (q *Queue) Reload(c *Config) {
|
||
|
if c == nil || c.Internal <= 0 || c.Target <= 0 {
|
||
|
return
|
||
|
}
|
||
|
// TODO codel queue size
|
||
|
q.mux.Lock()
|
||
|
q.conf = c
|
||
|
q.mux.Unlock()
|
||
|
}
|
||
|
|
||
|
// Stat return the statistics of codel
|
||
|
func (q *Queue) Stat() Stat {
|
||
|
q.mux.Lock()
|
||
|
defer q.mux.Unlock()
|
||
|
return Stat{
|
||
|
Dropping: q.dropping,
|
||
|
FaTime: q.faTime,
|
||
|
DropNext: q.dropNext,
|
||
|
Packets: len(q.packets),
|
||
|
}
|
||
|
}
|
||
|
|
||
|
// Push req into CoDel request buffer queue.
|
||
|
// if return error is nil,the caller must call q.Done() after finish request handling
|
||
|
func (q *Queue) Push(ctx context.Context) (err error) {
|
||
|
r := packet{
|
||
|
ch: q.pool.Get().(chan bool),
|
||
|
ts: time.Now().UnixNano() / int64(time.Millisecond),
|
||
|
}
|
||
|
select {
|
||
|
case q.packets <- r:
|
||
|
default:
|
||
|
err = ecode.LimitExceed
|
||
|
q.pool.Put(r.ch)
|
||
|
}
|
||
|
if err == nil {
|
||
|
select {
|
||
|
case drop := <-r.ch:
|
||
|
if drop {
|
||
|
err = ecode.LimitExceed
|
||
|
}
|
||
|
q.pool.Put(r.ch)
|
||
|
case <-ctx.Done():
|
||
|
err = ecode.Deadline
|
||
|
}
|
||
|
}
|
||
|
return
|
||
|
}
|
||
|
|
||
|
// Pop req from CoDel request buffer queue.
|
||
|
func (q *Queue) Pop() {
|
||
|
for {
|
||
|
select {
|
||
|
case p := <-q.packets:
|
||
|
drop := q.judge(p)
|
||
|
select {
|
||
|
case p.ch <- drop:
|
||
|
if !drop {
|
||
|
return
|
||
|
}
|
||
|
default:
|
||
|
q.pool.Put(p.ch)
|
||
|
}
|
||
|
default:
|
||
|
return
|
||
|
}
|
||
|
}
|
||
|
}
|
||
|
|
||
|
func (q *Queue) controlLaw(now int64) int64 {
|
||
|
q.dropNext = now + int64(float64(q.conf.Internal)/math.Sqrt(float64(q.count)))
|
||
|
return q.dropNext
|
||
|
}
|
||
|
|
||
|
// judge decide if the packet should drop or not.
|
||
|
func (q *Queue) judge(p packet) (drop bool) {
|
||
|
now := time.Now().UnixNano() / int64(time.Millisecond)
|
||
|
sojurn := now - p.ts
|
||
|
q.mux.Lock()
|
||
|
defer q.mux.Unlock()
|
||
|
if sojurn < q.conf.Target {
|
||
|
q.faTime = 0
|
||
|
} else if q.faTime == 0 {
|
||
|
q.faTime = now + q.conf.Internal
|
||
|
} else if now >= q.faTime {
|
||
|
drop = true
|
||
|
}
|
||
|
if q.dropping {
|
||
|
if !drop {
|
||
|
// sojourn time below target - leave dropping state
|
||
|
q.dropping = false
|
||
|
} else if now > q.dropNext {
|
||
|
q.count++
|
||
|
q.dropNext = q.controlLaw(q.dropNext)
|
||
|
drop = true
|
||
|
return
|
||
|
}
|
||
|
} else if drop && (now-q.dropNext < q.conf.Internal || now-q.faTime >= q.conf.Internal) {
|
||
|
q.dropping = true
|
||
|
// If we're in a drop cycle, the drop rate that controlled the queue
|
||
|
// on the last cycle is a good starting point to control it now.
|
||
|
if now-q.dropNext < q.conf.Internal {
|
||
|
if q.count > 2 {
|
||
|
q.count = q.count - 2
|
||
|
} else {
|
||
|
q.count = 1
|
||
|
}
|
||
|
} else {
|
||
|
q.count = 1
|
||
|
}
|
||
|
q.dropNext = q.controlLaw(now)
|
||
|
drop = true
|
||
|
return
|
||
|
}
|
||
|
return
|
||
|
}
|