diff --git a/pkg/net/netutil/breaker/sre_breaker.go b/pkg/net/netutil/breaker/sre_breaker.go index 5e410cd98..44ccb4b71 100644 --- a/pkg/net/netutil/breaker/sre_breaker.go +++ b/pkg/net/netutil/breaker/sre_breaker.go @@ -3,6 +3,7 @@ package breaker import ( "math" "math/rand" + "sync" "sync/atomic" "time" @@ -14,12 +15,14 @@ import ( // sreBreaker is a sre CircuitBreaker pattern. type sreBreaker struct { stat metric.RollingCounter + r *rand.Rand + // rand.New(...) returns a non thread safe object + randLock sync.Mutex k float64 request int64 state int32 - r *rand.Rand } func newSRE(c *Config) Breaker { @@ -69,14 +72,14 @@ func (b *sreBreaker) Allow() error { atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) } dr := math.Max(0, (float64(total)-k)/float64(total+1)) - rr := b.r.Float64() + drop := b.trueOnProba(dr) if log.V(5) { - log.Info("breaker: drop ratio: %f, real rand: %f, drop: %v", dr, rr, dr > rr) + log.Info("breaker: drop ratio: %f, drop: %t", dr, drop) } - if dr <= rr { - return nil + if drop { + return ecode.ServiceUnavailable } - return ecode.ServiceUnavailable + return nil } func (b *sreBreaker) MarkSuccess() { @@ -88,3 +91,10 @@ func (b *sreBreaker) MarkFailed() { // drop ratio higher. b.stat.Add(0) } + +func (b *sreBreaker) trueOnProba(proba float64) (truth bool) { + b.randLock.Lock() + truth = b.r.Float64() < proba + b.randLock.Unlock() + return +} diff --git a/pkg/net/netutil/breaker/sre_breaker_test.go b/pkg/net/netutil/breaker/sre_breaker_test.go index 4559c4fc5..9df374736 100644 --- a/pkg/net/netutil/breaker/sre_breaker_test.go +++ b/pkg/net/netutil/breaker/sre_breaker_test.go @@ -5,7 +5,7 @@ import ( "math/rand" "testing" "time" - + "github.com/bilibili/kratos/pkg/stat/metric" xtime "github.com/bilibili/kratos/pkg/time" @@ -147,6 +147,22 @@ func TestSRESummary(t *testing.T) { }) } +func TestTrueOnProba(t *testing.T) { + const proba = math.Pi / 10 + const total = 100000 + const epsilon = 0.05 + var count int + b := getSREBreaker() + for i := 0; i < total; i++ { + if b.trueOnProba(proba) { + count++ + } + } + + ratio := float64(count) / float64(total) + assert.InEpsilon(t, proba, ratio, epsilon) +} + func BenchmarkSreBreakerAllow(b *testing.B) { breaker := getSRE() b.ResetTimer()