* Update node.go

* Update node.go

* Update node.go

* Update node.go
status-code-override
songzhibin97 3 years ago committed by chenzhihui
parent 72ba4e5c43
commit 3a15f5df9e
  1. 17
      selector/node/ewma/node.go

@ -9,6 +9,7 @@ import (
"time" "time"
"github.com/go-kratos/kratos/v2/errors" "github.com/go-kratos/kratos/v2/errors"
"github.com/go-kratos/kratos/v2/selector" "github.com/go-kratos/kratos/v2/selector"
) )
@ -75,11 +76,11 @@ func (n *Node) load() (load uint64) {
predictInterval := avgLag / 5 predictInterval := avgLag / 5
if predictInterval < int64(time.Millisecond*5) { if predictInterval < int64(time.Millisecond*5) {
predictInterval = int64(time.Millisecond * 5) predictInterval = int64(time.Millisecond * 5)
} else if predictInterval > int64(time.Millisecond*200) { }
if predictInterval > int64(time.Millisecond*200) {
predictInterval = int64(time.Millisecond * 200) predictInterval = int64(time.Millisecond * 200)
} }
if now-lastPredictTs > predictInterval { if now-lastPredictTs > predictInterval && atomic.CompareAndSwapInt64(&n.predictTs, lastPredictTs, now) {
if atomic.CompareAndSwapInt64(&n.predictTs, lastPredictTs, now) {
var ( var (
total int64 total int64
count int count int
@ -101,19 +102,18 @@ func (n *Node) load() (load uint64) {
n.lk.RUnlock() n.lk.RUnlock()
atomic.StoreInt64(&n.predict, predict) atomic.StoreInt64(&n.predict, predict)
} }
}
if avgLag == 0 { if avgLag == 0 {
// penalty is the penalty value when there is no data when the node is just started. // penalty is the penalty value when there is no data when the node is just started.
// The default value is 1e9 * 10 // The default value is 1e9 * 10
load = penalty * uint64(atomic.LoadInt64(&n.inflight)) load = penalty * uint64(atomic.LoadInt64(&n.inflight))
} else { return
}
predict := atomic.LoadInt64(&n.predict) predict := atomic.LoadInt64(&n.predict)
if predict > avgLag { if predict > avgLag {
avgLag = predict avgLag = predict
} }
load = uint64(avgLag) * uint64(atomic.LoadInt64(&n.inflight)) load = uint64(avgLag) * uint64(atomic.LoadInt64(&n.inflight))
}
return return
} }
@ -155,11 +155,10 @@ func (n *Node) Pick() selector.DoneFunc {
success := uint64(1000) // error value ,if error set 1 success := uint64(1000) // error value ,if error set 1
if di.Err != nil { if di.Err != nil {
if n.errHandler != nil { if n.errHandler != nil && n.errHandler(di.Err) {
if n.errHandler(di.Err) {
success = 0 success = 0
} }
} else if errors.Is(context.DeadlineExceeded, di.Err) || errors.Is(context.Canceled, di.Err) || if errors.Is(context.DeadlineExceeded, di.Err) || errors.Is(context.Canceled, di.Err) ||
errors.IsServiceUnavailable(di.Err) || errors.IsGatewayTimeout(di.Err) { errors.IsServiceUnavailable(di.Err) || errors.IsGatewayTimeout(di.Err) {
success = 0 success = 0
} }

Loading…
Cancel
Save