From 4a93aa9b8d5dca550cc60a0c51c4726f83a2e6f8 Mon Sep 17 00:00:00 2001 From: longXboy Date: Thu, 30 May 2019 15:38:08 +0800 Subject: [PATCH] add filter for p2c --- pkg/net/rpc/warden/CHANGELOG.md | 5 ++- pkg/net/rpc/warden/balancer/p2c/p2c.go | 46 ++++++++++++++++++++------ 2 files changed, 39 insertions(+), 12 deletions(-) diff --git a/pkg/net/rpc/warden/CHANGELOG.md b/pkg/net/rpc/warden/CHANGELOG.md index f249e24e4..ebaff68ad 100644 --- a/pkg/net/rpc/warden/CHANGELOG.md +++ b/pkg/net/rpc/warden/CHANGELOG.md @@ -1,6 +1,9 @@ ### net/rpc/warden -##### Version 1.1.12 +##### Version 1.1.14 +1. p2c balancer增加filter保护 + +##### Version 1.1.13 1. 设置 caller 为 no_user 如果 user 不存在 ##### Version 1.1.12 diff --git a/pkg/net/rpc/warden/balancer/p2c/p2c.go b/pkg/net/rpc/warden/balancer/p2c/p2c.go index b6ea90309..cdb455523 100644 --- a/pkg/net/rpc/warden/balancer/p2c/p2c.go +++ b/pkg/net/rpc/warden/balancer/p2c/p2c.go @@ -67,10 +67,24 @@ type subConn struct { reqs int64 } +func (sc *subConn) valid() bool { + return sc.health() > 500 && atomic.LoadUint64(&sc.svrCPU) < 900 +} + func (sc *subConn) health() uint64 { return atomic.LoadUint64(&sc.success) } +func (sc *subConn) load() uint64 { + lag := uint64(math.Sqrt(float64(atomic.LoadUint64(&sc.lag))) + 1) + load := atomic.LoadUint64(&sc.svrCPU) * lag * uint64(atomic.LoadInt64(&sc.inflight)) + if load == 0 { + // penalty是初始化没有数据时的惩罚值,默认为1e9 * 250 + load = penalty + } + return load +} + func (sc *subConn) cost() uint64 { load := atomic.LoadUint64(&sc.svrCPU) * atomic.LoadUint64(&sc.lag) * uint64(atomic.LoadInt64(&sc.inflight)) if load == 0 { @@ -155,6 +169,24 @@ func (p *p2cPicker) Pick(ctx context.Context, opts balancer.PickOptions) (balanc return p.pick(ctx, opts) } +// choose two distinct nodes +func (p *p2cPicker) prePick() (nodeA *subConn, nodeB *subConn) { + for i := 0; i < 3; i++ { + p.lk.Lock() + a := p.r.Intn(len(p.subConns)) + b := p.r.Intn(len(p.subConns) - 1) + p.lk.Unlock() + if b >= a { + b = b + 1 + } + nodeA, nodeB = p.subConns[a], p.subConns[b] + if nodeA.valid() || nodeB.valid() { + break + } + } + return +} + func (p *p2cPicker) pick(ctx context.Context, opts balancer.PickOptions) (balancer.SubConn, func(balancer.DoneInfo), error) { var pc, upc *subConn start := time.Now().UnixNano() @@ -164,17 +196,9 @@ func (p *p2cPicker) pick(ctx context.Context, opts balancer.PickOptions) (balanc } else if len(p.subConns) == 1 { pc = p.subConns[0] } else { - // choose two distinct nodes - p.lk.Lock() - a := p.r.Intn(len(p.subConns)) - b := p.r.Intn(len(p.subConns) - 1) - p.lk.Unlock() - if b >= a { - b = b + 1 - } - nodeA, nodeB := p.subConns[a], p.subConns[b] + nodeA, nodeB := p.prePick() // meta.Weight为服务发布者在disocvery中设置的权重 - if nodeA.cost()*nodeB.health()*nodeB.meta.Weight > nodeB.cost()*nodeA.health()*nodeA.meta.Weight { + if nodeA.load()*nodeB.health()*nodeB.meta.Weight > nodeB.load()*nodeA.health()*nodeA.meta.Weight { pc, upc = nodeB, nodeA } else { pc, upc = nodeA, nodeB @@ -258,7 +282,7 @@ func (p *p2cPicker) printStats() { stat.inflight = atomic.LoadInt64(&conn.inflight) stat.lantency = atomic.LoadUint64(&conn.lag) stat.reqs = atomic.SwapInt64(&conn.reqs, 0) - load := stat.cpu * uint64(stat.inflight) * stat.lantency + load := conn.load() if load != 0 { stat.score = float64(stat.cs*conn.meta.Weight*1e8) / float64(load) }