* Update node.go

* Update node.go

* Update node.go

* Update node.go
pull/2021/head
songzhibin97 3 years ago committed by GitHub
parent 92ba6a3209
commit 0c511808ce
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 65
      selector/node/ewma/node.go

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/go-kratos/kratos/v2/errors" "github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/selector" "github.com/go-kratos/kratos/v2/selector"
) )
@ -75,45 +76,44 @@ func (n *Node) load() (load uint64) {
predictInterval := avgLag / 5 predictInterval := avgLag / 5
if predictInterval < int64(time.Millisecond*5) { if predictInterval < int64(time.Millisecond*5) {
predictInterval = int64(time.Millisecond * 5) predictInterval = int64(time.Millisecond * 5)
} else if predictInterval > int64(time.Millisecond*200) { }
if predictInterval > int64(time.Millisecond*200) {
predictInterval = int64(time.Millisecond * 200) predictInterval = int64(time.Millisecond * 200)
} }
if now-lastPredictTs > predictInterval { if now-lastPredictTs > predictInterval && atomic.CompareAndSwapInt64(&n.predictTs, lastPredictTs, now) {
if atomic.CompareAndSwapInt64(&n.predictTs, lastPredictTs, now) { var (
var ( total int64
total int64 count int
count int predict int64
predict int64 )
) n.lk.RLock()
n.lk.RLock() first := n.inflights.Front()
first := n.inflights.Front() for first != nil {
for first != nil { lag := now - first.Value.(int64)
lag := now - first.Value.(int64) if lag > avgLag {
if lag > avgLag { count++
count++ total += lag
total += lag
}
first = first.Next()
} }
if count > (n.inflights.Len()/2 + 1) { first = first.Next()
predict = total / int64(count) }
} if count > (n.inflights.Len()/2 + 1) {
n.lk.RUnlock() predict = total / int64(count)
atomic.StoreInt64(&n.predict, predict)
} }
n.lk.RUnlock()
atomic.StoreInt64(&n.predict, predict)
} }
if avgLag == 0 { if avgLag == 0 {
// penalty is the penalty value when there is no data when the node is just started. // penalty is the penalty value when there is no data when the node is just started.
// The default value is 1e9 * 10 // The default value is 1e9 * 10
load = penalty * uint64(atomic.LoadInt64(&n.inflight)) load = penalty * uint64(atomic.LoadInt64(&n.inflight))
} else { return
predict := atomic.LoadInt64(&n.predict) }
if predict > avgLag { predict := atomic.LoadInt64(&n.predict)
avgLag = predict if predict > avgLag {
} avgLag = predict
load = uint64(avgLag) * uint64(atomic.LoadInt64(&n.inflight))
} }
load = uint64(avgLag) * uint64(atomic.LoadInt64(&n.inflight))
return return
} }
@ -155,11 +155,10 @@ func (n *Node) Pick() selector.DoneFunc {
success := uint64(1000) // error value ,if error set 1 success := uint64(1000) // error value ,if error set 1
if di.Err != nil { if di.Err != nil {
if n.errHandler != nil { if n.errHandler != nil && n.errHandler(di.Err) {
if n.errHandler(di.Err) { success = 0
success = 0 }
} if errors.Is(context.DeadlineExceeded, di.Err) || errors.Is(context.Canceled, di.Err) ||
} else if errors.Is(context.DeadlineExceeded, di.Err) || errors.Is(context.Canceled, di.Err) ||
errors.IsServiceUnavailable(di.Err) || errors.IsGatewayTimeout(di.Err) { errors.IsServiceUnavailable(di.Err) || errors.IsGatewayTimeout(di.Err) {
success = 0 success = 0
} }

Loading…
Cancel
Save