avoid data race while using Rand in sre_breaker.go

pull/211/head
kevin 6 years ago
parent 382bbf3c4f
commit 57a614077f
  1. 26
      pkg/net/netutil/breaker/sre_breaker.go
  2. 16
      pkg/net/netutil/breaker/sre_breaker_test.go

@ -3,6 +3,7 @@ package breaker
import ( import (
"math" "math"
"math/rand" "math/rand"
"sync"
"sync/atomic" "sync/atomic"
"time" "time"
@ -11,6 +12,12 @@ import (
"github.com/bilibili/kratos/pkg/stat/metric" "github.com/bilibili/kratos/pkg/stat/metric"
) )
var (
// rand.New(...) returns a non thread safe object
random = rand.New(rand.NewSource(time.Now().UnixNano()))
lock sync.Mutex
)
// sreBreaker is a sre CircuitBreaker pattern. // sreBreaker is a sre CircuitBreaker pattern.
type sreBreaker struct { type sreBreaker struct {
stat metric.RollingCounter stat metric.RollingCounter
@ -19,7 +26,6 @@ type sreBreaker struct {
request int64 request int64
state int32 state int32
r *rand.Rand
} }
func newSRE(c *Config) Breaker { func newSRE(c *Config) Breaker {
@ -30,7 +36,6 @@ func newSRE(c *Config) Breaker {
stat := metric.NewRollingCounter(counterOpts) stat := metric.NewRollingCounter(counterOpts)
return &sreBreaker{ return &sreBreaker{
stat: stat, stat: stat,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
request: c.Request, request: c.Request,
k: c.K, k: c.K,
@ -69,14 +74,14 @@ func (b *sreBreaker) Allow() error {
atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen) atomic.CompareAndSwapInt32(&b.state, StateClosed, StateOpen)
} }
dr := math.Max(0, (float64(total)-k)/float64(total+1)) dr := math.Max(0, (float64(total)-k)/float64(total+1))
rr := b.r.Float64() drop := trueOnProba(dr)
if log.V(5) { if log.V(5) {
log.Info("breaker: drop ratio: %f, real rand: %f, drop: %v", dr, rr, dr > rr) log.Info("breaker: drop ratio: %f, drop: %t", dr, drop)
} }
if dr <= rr { if drop {
return nil return ecode.ServiceUnavailable
} }
return ecode.ServiceUnavailable return nil
} }
func (b *sreBreaker) MarkSuccess() { func (b *sreBreaker) MarkSuccess() {
@ -88,3 +93,10 @@ func (b *sreBreaker) MarkFailed() {
// drop ratio higher. // drop ratio higher.
b.stat.Add(0) b.stat.Add(0)
} }
func trueOnProba(proba float64) (truth bool) {
lock.Lock()
truth = random.Float64() < proba
lock.Unlock()
return
}

@ -29,7 +29,6 @@ func getSREBreaker() *sreBreaker {
stat := metric.NewRollingCounter(counterOpts) stat := metric.NewRollingCounter(counterOpts)
return &sreBreaker{ return &sreBreaker{
stat: stat, stat: stat,
r: rand.New(rand.NewSource(time.Now().UnixNano())),
request: 100, request: 100,
k: 2, k: 2,
@ -147,6 +146,21 @@ func TestSRESummary(t *testing.T) {
}) })
} }
func TestTrueOnProba(t *testing.T) {
const proba = math.Pi / 10
const total = 100000
const epsilon = 0.05
var count int
for i := 0; i < total; i++ {
if trueOnProba(proba) {
count++
}
}
ratio := float64(count) / float64(total)
assert.InEpsilon(t, proba, ratio, epsilon)
}
func BenchmarkSreBreakerAllow(b *testing.B) { func BenchmarkSreBreakerAllow(b *testing.B) {
breaker := getSRE() breaker := getSRE()
b.ResetTimer() b.ResetTimer()

Loading…
Cancel
Save