fix codel to accept request add config outstanding (#698)

pull/808/head
Taction 4 years ago committed by GitHub
parent 6e49fe9ac6
commit 5e3df708d8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      pkg/container/queue/aqm/codel.go
  2. 26
      pkg/container/queue/aqm/codel_test.go

@ -13,6 +13,7 @@ import (
type Config struct { type Config struct {
Target int64 // target queue delay (default 20 ms). Target int64 // target queue delay (default 20 ms).
Internal int64 // sliding minimum time window width (default 500 ms) Internal int64 // sliding minimum time window width (default 500 ms)
MaxOutstanding int64 //max num of concurrent acquires
} }
// Stat is the Statistics of codel. // Stat is the Statistics of codel.
@ -31,6 +32,7 @@ type packet struct {
var defaultConf = &Config{ var defaultConf = &Config{
Target: 50, Target: 50,
Internal: 500, Internal: 500,
MaxOutstanding: 40,
} }
// Queue queue is CoDel req buffer queue. // Queue queue is CoDel req buffer queue.
@ -44,6 +46,7 @@ type Queue struct {
dropping bool // Equal to 1 if in drop state dropping bool // Equal to 1 if in drop state
faTime int64 // Time when we'll declare we're above target (0 if below) faTime int64 // Time when we'll declare we're above target (0 if below)
dropNext int64 // Packets dropped since going into drop state dropNext int64 // Packets dropped since going into drop state
outstanding int64
} }
// Default new a default codel queue. // Default new a default codel queue.
@ -68,7 +71,7 @@ func New(conf *Config) *Queue {
// Reload set queue config. // Reload set queue config.
func (q *Queue) Reload(c *Config) { func (q *Queue) Reload(c *Config) {
if c == nil || c.Internal <= 0 || c.Target <= 0 { if c == nil || c.Internal <= 0 || c.Target <= 0 || c.MaxOutstanding <= 0 {
return return
} }
// TODO codel queue size // TODO codel queue size
@ -92,6 +95,13 @@ func (q *Queue) Stat() Stat {
// Push req into CoDel request buffer queue. // Push req into CoDel request buffer queue.
// if return error is nil,the caller must call q.Done() after finish request handling // if return error is nil,the caller must call q.Done() after finish request handling
func (q *Queue) Push(ctx context.Context) (err error) { func (q *Queue) Push(ctx context.Context) (err error) {
q.mux.Lock()
if q.outstanding < q.conf.MaxOutstanding && len(q.packets) == 0 {
q.outstanding ++
q.mux.Unlock()
return
}
q.mux.Unlock()
r := packet{ r := packet{
ch: q.pool.Get().(chan bool), ch: q.pool.Get().(chan bool),
ts: time.Now().UnixNano() / int64(time.Millisecond), ts: time.Now().UnixNano() / int64(time.Millisecond),
@ -118,6 +128,14 @@ func (q *Queue) Push(ctx context.Context) (err error) {
// Pop req from CoDel request buffer queue. // Pop req from CoDel request buffer queue.
func (q *Queue) Pop() { func (q *Queue) Pop() {
q.mux.Lock()
q.outstanding --
if q.outstanding < 0 {
q.outstanding = 0
q.mux.Unlock()
return
}
defer q.mux.Unlock()
for { for {
select { select {
case p := <-q.packets: case p := <-q.packets:
@ -145,8 +163,6 @@ func (q *Queue) controlLaw(now int64) int64 {
func (q *Queue) judge(p packet) (drop bool) { func (q *Queue) judge(p packet) (drop bool) {
now := time.Now().UnixNano() / int64(time.Millisecond) now := time.Now().UnixNano() / int64(time.Millisecond)
sojurn := now - p.ts sojurn := now - p.ts
q.mux.Lock()
defer q.mux.Unlock()
if sojurn < q.conf.Target { if sojurn < q.conf.Target {
q.faTime = 0 q.faTime = 0
} else if q.faTime == 0 { } else if q.faTime == 0 {
@ -161,7 +177,6 @@ func (q *Queue) judge(p packet) (drop bool) {
} else if now > q.dropNext { } else if now > q.dropNext {
q.count++ q.count++
q.dropNext = q.controlLaw(q.dropNext) q.dropNext = q.controlLaw(q.dropNext)
drop = true
return return
} }
} else if drop && (now-q.dropNext < q.conf.Internal || now-q.faTime >= q.conf.Internal) { } else if drop && (now-q.dropNext < q.conf.Internal || now-q.faTime >= q.conf.Internal) {
@ -178,8 +193,9 @@ func (q *Queue) judge(p packet) (drop bool) {
q.count = 1 q.count = 1
} }
q.dropNext = q.controlLaw(now) q.dropNext = q.controlLaw(now)
drop = true
return return
} }
q.outstanding ++
drop = false
return return
} }

@ -14,6 +14,7 @@ import (
var testConf = &Config{ var testConf = &Config{
Target: 20, Target: 20,
Internal: 500, Internal: 500,
MaxOutstanding: 20,
} }
var qps = time.Microsecond * 2000 var qps = time.Microsecond * 2000
@ -22,9 +23,10 @@ func TestCoDel1200(t *testing.T) {
q := New(testConf) q := New(testConf)
drop := new(int64) drop := new(int64)
tm := new(int64) tm := new(int64)
accept := new(int64)
delay := time.Millisecond * 3000 delay := time.Millisecond * 3000
testPush(q, qps, delay, drop, tm) testPush(q, qps, delay, drop, tm, accept)
fmt.Printf("qps %v process time %v drop %d timeout %d \n", int64(time.Second/qps), delay, *drop, *tm) fmt.Printf("qps %v process time %v drop %d timeout %d accept %d \n", int64(time.Second/qps), delay, *drop, *tm, *accept)
time.Sleep(time.Second) time.Sleep(time.Second)
} }
@ -32,9 +34,10 @@ func TestCoDel200(t *testing.T) {
q := New(testConf) q := New(testConf)
drop := new(int64) drop := new(int64)
tm := new(int64) tm := new(int64)
accept := new(int64)
delay := time.Millisecond * 2000 delay := time.Millisecond * 2000
testPush(q, qps, delay, drop, tm) testPush(q, qps, delay, drop, tm, accept)
fmt.Printf("qps %v process time %v drop %d timeout %d \n", int64(time.Second/qps), delay, *drop, *tm) fmt.Printf("qps %v process time %v drop %d timeout %d accept %d \n", int64(time.Second/qps), delay, *drop, *tm, *accept)
time.Sleep(time.Second) time.Sleep(time.Second)
} }
@ -42,21 +45,23 @@ func TestCoDel100(t *testing.T) {
q := New(testConf) q := New(testConf)
drop := new(int64) drop := new(int64)
tm := new(int64) tm := new(int64)
accept := new(int64)
delay := time.Millisecond * 1000 delay := time.Millisecond * 1000
testPush(q, qps, delay, drop, tm) testPush(q, qps, delay, drop, tm, accept)
fmt.Printf("qps %v process time %v drop %d timeout %d \n", int64(time.Second/qps), delay, *drop, *tm) fmt.Printf("qps %v process time %v drop %d timeout %d accept %d \n", int64(time.Second/qps), delay, *drop, *tm, *accept)
} }
func TestCoDel50(t *testing.T) { func TestCoDel50(t *testing.T) {
q := New(testConf) q := New(testConf)
drop := new(int64) drop := new(int64)
tm := new(int64) tm := new(int64)
delay := time.Millisecond * 500 accept := new(int64)
testPush(q, qps, delay, drop, tm) delay := time.Millisecond * 50
fmt.Printf("qps %v process time %v drop %d timeout %d \n", int64(time.Second/qps), delay, *drop, *tm) testPush(q, qps, delay, drop, tm, accept)
fmt.Printf("qps %v process time %v drop %d timeout %d accept %d \n", int64(time.Second/qps), delay, *drop, *tm, *accept)
} }
func testPush(q *Queue, sleep time.Duration, delay time.Duration, drop *int64, tm *int64) { func testPush(q *Queue, sleep time.Duration, delay time.Duration, drop *int64, tm *int64, accept *int64) {
var group sync.WaitGroup var group sync.WaitGroup
for i := 0; i < 5000; i++ { for i := 0; i < 5000; i++ {
time.Sleep(sleep) time.Sleep(sleep)
@ -72,6 +77,7 @@ func testPush(q *Queue, sleep time.Duration, delay time.Duration, drop *int64, t
atomic.AddInt64(tm, 1) atomic.AddInt64(tm, 1)
} }
} else { } else {
atomic.AddInt64(accept, 1)
time.Sleep(delay) time.Sleep(delay)
q.Pop() q.Pop()
} }

Loading…
Cancel
Save