From b0159d695e45174c09b763b681919e9a5082f5aa Mon Sep 17 00:00:00 2001 From: Windfarer Date: Wed, 9 Sep 2020 11:27:43 +0800 Subject: [PATCH] fix trace pending --- pkg/cache/redis/errors.go | 7 +++++ pkg/cache/redis/trace.go | 52 +++++++++++++++++++++-------------- pkg/cache/redis/trace_test.go | 20 ++++++++++++++ 3 files changed, 58 insertions(+), 21 deletions(-) diff --git a/pkg/cache/redis/errors.go b/pkg/cache/redis/errors.go index e3bd5c339..6be483ff9 100644 --- a/pkg/cache/redis/errors.go +++ b/pkg/cache/redis/errors.go @@ -20,6 +20,9 @@ func formatErr(err error, name, addr string) string { case strings.HasPrefix(es, "read"): return "read timeout" case strings.HasPrefix(es, "dial"): + if strings.Contains(es, "connection refused") { + return "connection refused" + } return "dial timeout" case strings.HasPrefix(es, "write"): return "write timeout" @@ -29,6 +32,10 @@ func formatErr(err error, name, addr string) string { return "reset" case strings.Contains(es, "broken"): return "broken pipe" + case strings.Contains(es, "pool exhausted"): + return "pool exhausted" + case strings.Contains(es, "pool closed"): + return "pool closed" default: return "unexpected err" } diff --git a/pkg/cache/redis/trace.go b/pkg/cache/redis/trace.go index 90144ce34..656b616c0 100644 --- a/pkg/cache/redis/trace.go +++ b/pkg/cache/redis/trace.go @@ -22,13 +22,14 @@ var _internalTags = []trace.Tag{ } type traceConn struct { - // tr for pipeline, if tr != nil meaning on pipeline + // tr parent trace. tr trace.Trace + // trPipe for pipeline, if trPipe != nil meaning on pipeline. + trPipe trace.Trace + // connTag include e.g. ip,port connTags []trace.Tag - ctx context.Context - // origin redis conn Conn pending int @@ -39,9 +40,15 @@ type traceConn struct { func (t *traceConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) { statement := getStatement(commandName, args...) defer t.slowLog(statement, time.Now()) + // NOTE: ignored empty commandName // current sdk will Do empty command after pipeline finished - if t.tr == nil || commandName == "" { + if commandName == "" { + t.pending = 0 + t.trPipe = nil + return t.Conn.Do(commandName, args...) + } + if t.tr == nil { return t.Conn.Do(commandName, args...) } tr := t.tr.Fork("", "Redis:"+commandName) @@ -60,18 +67,19 @@ func (t *traceConn) Send(commandName string, args ...interface{}) (err error) { if t.tr == nil { return t.Conn.Send(commandName, args...) } - if t.pending == 1 { - t.tr = t.tr.Fork("", "Redis:Pipeline") - t.tr.SetTag(_internalTags...) - t.tr.SetTag(t.connTags...) + + if t.trPipe == nil { + t.trPipe = t.tr.Fork("", "Redis:Pipeline") + t.trPipe.SetTag(_internalTags...) + t.trPipe.SetTag(t.connTags...) } - t.tr.SetLog( + t.trPipe.SetLog( trace.Log(trace.LogEvent, "Send"), trace.Log("db.statement", statement), ) if err = t.Conn.Send(commandName, args...); err != nil { - t.tr.SetTag(trace.TagBool(trace.TagError, true)) - t.tr.SetLog( + t.trPipe.SetTag(trace.TagBool(trace.TagError, true)) + t.trPipe.SetLog( trace.Log(trace.LogEvent, "Send Fail"), trace.Log(trace.LogMessage, err.Error()), ) @@ -81,14 +89,14 @@ func (t *traceConn) Send(commandName string, args ...interface{}) (err error) { func (t *traceConn) Flush() error { defer t.slowLog("Flush", time.Now()) - if t.tr == nil { + if t.trPipe == nil { return t.Conn.Flush() } - t.tr.SetLog(trace.Log(trace.LogEvent, "Flush")) + t.trPipe.SetLog(trace.Log(trace.LogEvent, "Flush")) err := t.Conn.Flush() if err != nil { - t.tr.SetTag(trace.TagBool(trace.TagError, true)) - t.tr.SetLog( + t.trPipe.SetTag(trace.TagBool(trace.TagError, true)) + t.trPipe.SetLog( trace.Log(trace.LogEvent, "Flush Fail"), trace.Log(trace.LogMessage, err.Error()), ) @@ -98,14 +106,14 @@ func (t *traceConn) Flush() error { func (t *traceConn) Receive() (reply interface{}, err error) { defer t.slowLog("Receive", time.Now()) - if t.tr == nil { + if t.trPipe == nil { return t.Conn.Receive() } - t.tr.SetLog(trace.Log(trace.LogEvent, "Receive")) + t.trPipe.SetLog(trace.Log(trace.LogEvent, "Receive")) reply, err = t.Conn.Receive() if err != nil { - t.tr.SetTag(trace.TagBool(trace.TagError, true)) - t.tr.SetLog( + t.trPipe.SetTag(trace.TagBool(trace.TagError, true)) + t.trPipe.SetLog( trace.Log(trace.LogEvent, "Receive Fail"), trace.Log(trace.LogMessage, err.Error()), ) @@ -114,8 +122,8 @@ func (t *traceConn) Receive() (reply interface{}, err error) { t.pending-- } if t.pending == 0 { - t.tr.Finish(nil) - t.tr = nil + t.trPipe.Finish(nil) + t.trPipe = nil } return reply, err } @@ -123,6 +131,8 @@ func (t *traceConn) Receive() (reply interface{}, err error) { func (t *traceConn) WithContext(ctx context.Context) Conn { t.Conn = t.Conn.WithContext(ctx) t.tr, _ = trace.FromContext(ctx) + t.pending = 0 + t.trPipe = nil return t } diff --git a/pkg/cache/redis/trace_test.go b/pkg/cache/redis/trace_test.go index c9502b808..412bf7ce3 100644 --- a/pkg/cache/redis/trace_test.go +++ b/pkg/cache/redis/trace_test.go @@ -190,3 +190,23 @@ func BenchmarkTraceConn(b *testing.B) { c2.Close() } } + +func TestTraceConnPending(t *testing.T) { + c, err := DialDefaultServer() + if err != nil { + t.Fatal(err) + } + tc := &traceConn{ + Conn: c, + connTags: []trace.Tag{trace.TagString(trace.TagPeerAddress, "abc")}, + slowLogThreshold: time.Duration(1 * time.Second), + } + err = tc.Send("SET", "a", "x") + if err != nil { + t.Fatal(err) + } + tc.Close() + assert.Equal(t, 1, tc.pending) + tc.Do("") + assert.Equal(t, 0, tc.pending) +}