From 57a614077fded7bf536545c109266b868be1d39f Mon Sep 17 00:00:00 2001 From: kevin Date: Tue, 16 Jul 2019 15:29:18 +0800 Subject: [PATCH 1/3] avoid data race while using Rand in sre_breaker.go --- pkg/net/netutil/breaker/sre_breaker.go | 26 +++++++++++++++------ pkg/net/netutil/breaker/sre_breaker_test.go | 18 ++++++++++++-- 2 files changed, 35 insertions(+), 9 deletions(-) diff --git a/pkg/net/netutil/breaker/sre_breaker.go b/pkg/net/netutil/breaker/sre_breaker.go index 5e410cd98..ef4c32856 100644 --- a/pkg/net/netutil/breaker/sre_breaker.go +++ b/pkg/net/netutil/breaker/sre_breaker.go @@ -3,6 +3,7 @@ package breaker import ( "math" "math/rand" + "sync" "sync/atomic" "time" @@ -11,6 +12,12 @@ import ( "github.com/bilibili/kratos/pkg/stat/metric" ) +var ( + // rand.New(...) returns a non thread safe object + random = rand.New(rand.NewSource(time.Now().UnixNano())) + lock sync.Mutex +) + // sreBreaker is a sre CircuitBreaker pattern. type sreBreaker struct { stat metric.RollingCounter @@ -19,7 +26,6 @@ type sreBreaker struct { request int64 state int32 - r *rand.Rand } func newSRE(c *Config) Breaker { @@ -30,7 +36,6 @@ func newSRE(c *Config) Breaker { stat := metric.NewRollingCounter(counterOpts) return &sreBreaker{ stat: stat, - r: rand.New(rand.NewSource(time.Now().UnixNano())), request: c.Request, k: c.K, @@ -69,14 +74,14 @@ func (b *sreBreaker) Allow() error { atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) } dr := math.Max(0, (float64(total)-k)/float64(total+1)) - rr := b.r.Float64() + drop := trueOnProba(dr) if log.V(5) { - log.Info("breaker: drop ratio: %f, real rand: %f, drop: %v", dr, rr, dr > rr) + log.Info("breaker: drop ratio: %f, drop: %t", dr, drop) } - if dr <= rr { - return nil + if drop { + return ecode.ServiceUnavailable } - return ecode.ServiceUnavailable + return nil } func (b *sreBreaker) MarkSuccess() { @@ -88,3 +93,10 @@ func (b *sreBreaker) MarkFailed() { // drop ratio higher. b.stat.Add(0) } + +func trueOnProba(proba float64) (truth bool) { + lock.Lock() + truth = random.Float64() < proba + lock.Unlock() + return +} diff --git a/pkg/net/netutil/breaker/sre_breaker_test.go b/pkg/net/netutil/breaker/sre_breaker_test.go index 4559c4fc5..bd1849d74 100644 --- a/pkg/net/netutil/breaker/sre_breaker_test.go +++ b/pkg/net/netutil/breaker/sre_breaker_test.go @@ -5,7 +5,7 @@ import ( "math/rand" "testing" "time" - + "github.com/bilibili/kratos/pkg/stat/metric" xtime "github.com/bilibili/kratos/pkg/time" @@ -29,7 +29,6 @@ func getSREBreaker() *sreBreaker { stat := metric.NewRollingCounter(counterOpts) return &sreBreaker{ stat: stat, - r: rand.New(rand.NewSource(time.Now().UnixNano())), request: 100, k: 2, @@ -147,6 +146,21 @@ func TestSRESummary(t *testing.T) { }) } +func TestTrueOnProba(t *testing.T) { + const proba = math.Pi / 10 + const total = 100000 + const epsilon = 0.05 + var count int + for i := 0; i < total; i++ { + if trueOnProba(proba) { + count++ + } + } + + ratio := float64(count) / float64(total) + assert.InEpsilon(t, proba, ratio, epsilon) +} + func BenchmarkSreBreakerAllow(b *testing.B) { breaker := getSRE() b.ResetTimer() From 76bfc4250c0a0512c1feb961ab90b4981cfab01d Mon Sep 17 00:00:00 2001 From: kevin Date: Sun, 21 Jul 2019 16:30:52 +0800 Subject: [PATCH 2/3] move lock into sreBreaker from outside for better performance --- pkg/net/netutil/breaker/sre_breaker.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/pkg/net/netutil/breaker/sre_breaker.go b/pkg/net/netutil/breaker/sre_breaker.go index ef4c32856..4627369ff 100644 --- a/pkg/net/netutil/breaker/sre_breaker.go +++ b/pkg/net/netutil/breaker/sre_breaker.go @@ -12,12 +12,6 @@ import ( "github.com/bilibili/kratos/pkg/stat/metric" ) -var ( - // rand.New(...) returns a non thread safe object - random = rand.New(rand.NewSource(time.Now().UnixNano())) - lock sync.Mutex -) - // sreBreaker is a sre CircuitBreaker pattern. type sreBreaker struct { stat metric.RollingCounter @@ -26,6 +20,10 @@ type sreBreaker struct { request int64 state int32 + + random *rand.Rand + // rand.New(...) returns a non thread safe object + randLock sync.Mutex } func newSRE(c *Config) Breaker { @@ -40,6 +38,7 @@ func newSRE(c *Config) Breaker { request: c.Request, k: c.K, state: StateClosed, + random: rand.New(rand.NewSource(time.Now().UnixNano())), } } @@ -74,7 +73,7 @@ func (b *sreBreaker) Allow() error { atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) } dr := math.Max(0, (float64(total)-k)/float64(total+1)) - drop := trueOnProba(dr) + drop := b.trueOnProba(dr) if log.V(5) { log.Info("breaker: drop ratio: %f, drop: %t", dr, drop) } @@ -94,9 +93,9 @@ func (b *sreBreaker) MarkFailed() { b.stat.Add(0) } -func trueOnProba(proba float64) (truth bool) { - lock.Lock() - truth = random.Float64() < proba - lock.Unlock() +func (b *sreBreaker) trueOnProba(proba float64) (truth bool) { + b.randLock.Lock() + truth = b.random.Float64() < proba + b.randLock.Unlock() return } From 34b80a81a4b254008960a2a07e2b7da40bacbb8c Mon Sep 17 00:00:00 2001 From: kevin Date: Sun, 21 Jul 2019 16:39:48 +0800 Subject: [PATCH 3/3] fix test fails --- pkg/net/netutil/breaker/sre_breaker.go | 11 +++++------ pkg/net/netutil/breaker/sre_breaker_test.go | 4 +++- 2 files changed, 8 insertions(+), 7 deletions(-) diff --git a/pkg/net/netutil/breaker/sre_breaker.go b/pkg/net/netutil/breaker/sre_breaker.go index 4627369ff..44ccb4b71 100644 --- a/pkg/net/netutil/breaker/sre_breaker.go +++ b/pkg/net/netutil/breaker/sre_breaker.go @@ -15,15 +15,14 @@ import ( // sreBreaker is a sre CircuitBreaker pattern. type sreBreaker struct { stat metric.RollingCounter + r *rand.Rand + // rand.New(...) returns a non thread safe object + randLock sync.Mutex k float64 request int64 state int32 - - random *rand.Rand - // rand.New(...) returns a non thread safe object - randLock sync.Mutex } func newSRE(c *Config) Breaker { @@ -34,11 +33,11 @@ func newSRE(c *Config) Breaker { stat := metric.NewRollingCounter(counterOpts) return &sreBreaker{ stat: stat, + r: rand.New(rand.NewSource(time.Now().UnixNano())), request: c.Request, k: c.K, state: StateClosed, - random: rand.New(rand.NewSource(time.Now().UnixNano())), } } @@ -95,7 +94,7 @@ func (b *sreBreaker) MarkFailed() { func (b *sreBreaker) trueOnProba(proba float64) (truth bool) { b.randLock.Lock() - truth = b.random.Float64() < proba + truth = b.r.Float64() < proba b.randLock.Unlock() return } diff --git a/pkg/net/netutil/breaker/sre_breaker_test.go b/pkg/net/netutil/breaker/sre_breaker_test.go index bd1849d74..9df374736 100644 --- a/pkg/net/netutil/breaker/sre_breaker_test.go +++ b/pkg/net/netutil/breaker/sre_breaker_test.go @@ -29,6 +29,7 @@ func getSREBreaker() *sreBreaker { stat := metric.NewRollingCounter(counterOpts) return &sreBreaker{ stat: stat, + r: rand.New(rand.NewSource(time.Now().UnixNano())), request: 100, k: 2, @@ -151,8 +152,9 @@ func TestTrueOnProba(t *testing.T) { const total = 100000 const epsilon = 0.05 var count int + b := getSREBreaker() for i := 0; i < total; i++ { - if trueOnProba(proba) { + if b.trueOnProba(proba) { count++ } }