add cache (#7)

pull/13/head
Sam 6 years ago committed by Felix Hao
parent 038b0ebb27
commit 7c6e0ea7ba
  1. 34
      pkg/cache/memcache/README.md
  2. 187
      pkg/cache/memcache/client.go
  3. 685
      pkg/cache/memcache/conn.go
  4. 76
      pkg/cache/memcache/errors.go
  5. 136
      pkg/cache/memcache/memcache.go
  6. 59
      pkg/cache/memcache/mock.go
  7. 197
      pkg/cache/memcache/pool.go
  8. 109
      pkg/cache/memcache/trace.go
  9. 32
      pkg/cache/memcache/util.go
  10. 49
      pkg/cache/redis/README.markdown
  11. 57
      pkg/cache/redis/commandinfo.go
  12. 597
      pkg/cache/redis/conn.go
  13. 169
      pkg/cache/redis/doc.go
  14. 33
      pkg/cache/redis/errors.go
  15. 117
      pkg/cache/redis/log.go
  16. 36
      pkg/cache/redis/mock.go
  17. 218
      pkg/cache/redis/pool.go
  18. 152
      pkg/cache/redis/pubsub.go
  19. 51
      pkg/cache/redis/redis.go
  20. 409
      pkg/cache/redis/reply.go
  21. 559
      pkg/cache/redis/scan.go
  22. 86
      pkg/cache/redis/script.go
  23. 142
      pkg/cache/redis/trace.go

@ -0,0 +1,34 @@
# go-common/cache/memcache
##### 项目简介
> 1. 提供protobuf,gob,json序列化方式,gzip的memcache接口
##### 编译环境
> 1. 请只用golang v1.7.x以上版本编译执行。
##### 测试
> 1. 执行当前目录下所有测试文件,测试所有功能
##### 特别说明
> 1. 使用protobuf需要在pb文件目录下运行business/make.sh脚本生成go文件才能使用
#### 使用方式
```golang
// 初始化 注意这里只是示例 展示用法 不能每次都New 只需要初始化一次
mc := memcache.New(&memcache.Config{})
// 程序关闭的时候调用close方法
defer mc.Close()
// 增加 key
err = mc.Set(c, &memcache.Item{})
// 删除key
err := mc.Delete(c,key)
// 获得某个key的内容
err := mc.Get(c,key).Scan(&v)
// 获取多个key的内容
replies, err := mc.GetMulti(c, keys)
for _, key := range replies.Keys() {
if err = replies.Scan(key, &v); err != nil {
return
}
}
```

@ -0,0 +1,187 @@
package memcache
import (
"context"
)
// Memcache memcache client
type Memcache struct {
pool *Pool
}
// Reply is the result of Get
type Reply struct {
err error
item *Item
conn Conn
closed bool
}
// Replies is the result of GetMulti
type Replies struct {
err error
items map[string]*Item
usedItems map[string]struct{}
conn Conn
closed bool
}
// New get a memcache client
func New(c *Config) *Memcache {
return &Memcache{pool: NewPool(c)}
}
// Close close connection pool
func (mc *Memcache) Close() error {
return mc.pool.Close()
}
// Conn direct get a connection
func (mc *Memcache) Conn(c context.Context) Conn {
return mc.pool.Get(c)
}
// Set writes the given item, unconditionally.
func (mc *Memcache) Set(c context.Context, item *Item) (err error) {
conn := mc.pool.Get(c)
err = conn.Set(item)
conn.Close()
return
}
// Add writes the given item, if no value already exists for its key.
// ErrNotStored is returned if that condition is not met.
func (mc *Memcache) Add(c context.Context, item *Item) (err error) {
conn := mc.pool.Get(c)
err = conn.Add(item)
conn.Close()
return
}
// Replace writes the given item, but only if the server *does* already hold data for this key.
func (mc *Memcache) Replace(c context.Context, item *Item) (err error) {
conn := mc.pool.Get(c)
err = conn.Replace(item)
conn.Close()
return
}
// CompareAndSwap writes the given item that was previously returned by Get
func (mc *Memcache) CompareAndSwap(c context.Context, item *Item) (err error) {
conn := mc.pool.Get(c)
err = conn.CompareAndSwap(item)
conn.Close()
return
}
// Get sends a command to the server for gets data.
func (mc *Memcache) Get(c context.Context, key string) *Reply {
conn := mc.pool.Get(c)
item, err := conn.Get(key)
if err != nil {
conn.Close()
}
return &Reply{err: err, item: item, conn: conn}
}
// Item get raw Item
func (r *Reply) Item() *Item {
return r.item
}
// Scan converts value, read from the memcache
func (r *Reply) Scan(v interface{}) (err error) {
if r.err != nil {
return r.err
}
err = r.conn.Scan(r.item, v)
if !r.closed {
r.conn.Close()
r.closed = true
}
return
}
// GetMulti is a batch version of Get
func (mc *Memcache) GetMulti(c context.Context, keys []string) (*Replies, error) {
conn := mc.pool.Get(c)
items, err := conn.GetMulti(keys)
rs := &Replies{err: err, items: items, conn: conn, usedItems: make(map[string]struct{}, len(keys))}
if (err != nil) || (len(items) == 0) {
rs.Close()
}
return rs, err
}
// Close close rows.
func (rs *Replies) Close() (err error) {
if !rs.closed {
err = rs.conn.Close()
rs.closed = true
}
return
}
// Item get Item from rows
func (rs *Replies) Item(key string) *Item {
return rs.items[key]
}
// Scan converts value, read from key in rows
func (rs *Replies) Scan(key string, v interface{}) (err error) {
if rs.err != nil {
return rs.err
}
item, ok := rs.items[key]
if !ok {
rs.Close()
return ErrNotFound
}
rs.usedItems[key] = struct{}{}
err = rs.conn.Scan(item, v)
if (err != nil) || (len(rs.items) == len(rs.usedItems)) {
rs.Close()
}
return
}
// Keys keys of result
func (rs *Replies) Keys() (keys []string) {
keys = make([]string, 0, len(rs.items))
for key := range rs.items {
keys = append(keys, key)
}
return
}
// Touch updates the expiry for the given key.
func (mc *Memcache) Touch(c context.Context, key string, timeout int32) (err error) {
conn := mc.pool.Get(c)
err = conn.Touch(key, timeout)
conn.Close()
return
}
// Delete deletes the item with the provided key.
func (mc *Memcache) Delete(c context.Context, key string) (err error) {
conn := mc.pool.Get(c)
err = conn.Delete(key)
conn.Close()
return
}
// Increment atomically increments key by delta.
func (mc *Memcache) Increment(c context.Context, key string, delta uint64) (newValue uint64, err error) {
conn := mc.pool.Get(c)
newValue, err = conn.Increment(key, delta)
conn.Close()
return
}
// Decrement atomically decrements key by delta.
func (mc *Memcache) Decrement(c context.Context, key string, delta uint64) (newValue uint64, err error) {
conn := mc.pool.Get(c)
newValue, err = conn.Decrement(key, delta)
conn.Close()
return
}

@ -0,0 +1,685 @@
package memcache
import (
"bufio"
"bytes"
"compress/gzip"
"context"
"encoding/gob"
"encoding/json"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
"github.com/gogo/protobuf/proto"
pkgerr "github.com/pkg/errors"
)
var (
crlf = []byte("\r\n")
spaceStr = string(" ")
replyOK = []byte("OK\r\n")
replyStored = []byte("STORED\r\n")
replyNotStored = []byte("NOT_STORED\r\n")
replyExists = []byte("EXISTS\r\n")
replyNotFound = []byte("NOT_FOUND\r\n")
replyDeleted = []byte("DELETED\r\n")
replyEnd = []byte("END\r\n")
replyTouched = []byte("TOUCHED\r\n")
replyValueStr = "VALUE"
replyClientErrorPrefix = []byte("CLIENT_ERROR ")
replyServerErrorPrefix = []byte("SERVER_ERROR ")
)
const (
_encodeBuf = 4096 // 4kb
// 1024*1024 - 1, set error???
_largeValue = 1000 * 1000 // 1MB
)
type reader struct {
io.Reader
}
func (r *reader) Reset(rd io.Reader) {
r.Reader = rd
}
// conn is the low-level implementation of Conn
type conn struct {
// Shared
mu sync.Mutex
err error
conn net.Conn
// Read & Write
readTimeout time.Duration
writeTimeout time.Duration
rw *bufio.ReadWriter
// Item Reader
ir bytes.Reader
// Compress
gr gzip.Reader
gw *gzip.Writer
cb bytes.Buffer
// Encoding
edb bytes.Buffer
// json
jr reader
jd *json.Decoder
je *json.Encoder
// protobuffer
ped *proto.Buffer
}
// DialOption specifies an option for dialing a Memcache server.
type DialOption struct {
f func(*dialOptions)
}
type dialOptions struct {
readTimeout time.Duration
writeTimeout time.Duration
dial func(network, addr string) (net.Conn, error)
}
// DialReadTimeout specifies the timeout for reading a single command reply.
func DialReadTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
do.readTimeout = d
}}
}
// DialWriteTimeout specifies the timeout for writing a single command.
func DialWriteTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
do.writeTimeout = d
}}
}
// DialConnectTimeout specifies the timeout for connecting to the Memcache server.
func DialConnectTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
dialer := net.Dialer{Timeout: d}
do.dial = dialer.Dial
}}
}
// DialNetDial specifies a custom dial function for creating TCP
// connections. If this option is left out, then net.Dial is
// used. DialNetDial overrides DialConnectTimeout.
func DialNetDial(dial func(network, addr string) (net.Conn, error)) DialOption {
return DialOption{func(do *dialOptions) {
do.dial = dial
}}
}
// Dial connects to the Memcache server at the given network and
// address using the specified options.
func Dial(network, address string, options ...DialOption) (Conn, error) {
do := dialOptions{
dial: net.Dial,
}
for _, option := range options {
option.f(&do)
}
netConn, err := do.dial(network, address)
if err != nil {
return nil, pkgerr.WithStack(err)
}
return NewConn(netConn, do.readTimeout, do.writeTimeout), nil
}
// NewConn returns a new memcache connection for the given net connection.
func NewConn(netConn net.Conn, readTimeout, writeTimeout time.Duration) Conn {
if writeTimeout <= 0 || readTimeout <= 0 {
panic("must config memcache timeout")
}
c := &conn{
conn: netConn,
rw: bufio.NewReadWriter(bufio.NewReader(netConn),
bufio.NewWriter(netConn)),
readTimeout: readTimeout,
writeTimeout: writeTimeout,
}
c.jd = json.NewDecoder(&c.jr)
c.je = json.NewEncoder(&c.edb)
c.gw = gzip.NewWriter(&c.cb)
c.edb.Grow(_encodeBuf)
// NOTE reuse bytes.Buffer internal buf
// DON'T concurrency call Scan
c.ped = proto.NewBuffer(c.edb.Bytes())
return c
}
func (c *conn) Close() error {
c.mu.Lock()
err := c.err
if c.err == nil {
c.err = pkgerr.New("memcache: closed")
err = c.conn.Close()
}
c.mu.Unlock()
return err
}
func (c *conn) fatal(err error) error {
c.mu.Lock()
if c.err == nil {
c.err = pkgerr.WithStack(err)
// Close connection to force errors on subsequent calls and to unblock
// other reader or writer.
c.conn.Close()
}
c.mu.Unlock()
return c.err
}
func (c *conn) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func (c *conn) Add(item *Item) error {
return c.populate("add", item)
}
func (c *conn) Set(item *Item) error {
return c.populate("set", item)
}
func (c *conn) Replace(item *Item) error {
return c.populate("replace", item)
}
func (c *conn) CompareAndSwap(item *Item) error {
return c.populate("cas", item)
}
func (c *conn) populate(cmd string, item *Item) (err error) {
if !legalKey(item.Key) {
return pkgerr.WithStack(ErrMalformedKey)
}
var res []byte
if res, err = c.encode(item); err != nil {
return
}
l := len(res)
count := l/(_largeValue) + 1
if count == 1 {
item.Value = res
return c.populateOne(cmd, item)
}
nItem := &Item{
Key: item.Key,
Value: []byte(strconv.Itoa(l)),
Expiration: item.Expiration,
Flags: item.Flags | flagLargeValue,
}
err = c.populateOne(cmd, nItem)
if err != nil {
return
}
k := item.Key
nItem.Flags = item.Flags
for i := 1; i <= count; i++ {
if i == count {
nItem.Value = res[_largeValue*(count-1):]
} else {
nItem.Value = res[_largeValue*(i-1) : _largeValue*i]
}
nItem.Key = fmt.Sprintf("%s%d", k, i)
if err = c.populateOne(cmd, nItem); err != nil {
return
}
}
return
}
func (c *conn) populateOne(cmd string, item *Item) (err error) {
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
// <command name> <key> <flags> <exptime> <bytes> [noreply]\r\n
if cmd == "cas" {
_, err = fmt.Fprintf(c.rw, "%s %s %d %d %d %d\r\n",
cmd, item.Key, item.Flags, item.Expiration, len(item.Value), item.cas)
} else {
_, err = fmt.Fprintf(c.rw, "%s %s %d %d %d\r\n",
cmd, item.Key, item.Flags, item.Expiration, len(item.Value))
}
if err != nil {
return c.fatal(err)
}
c.rw.Write(item.Value)
c.rw.Write(crlf)
if err = c.rw.Flush(); err != nil {
return c.fatal(err)
}
if c.readTimeout != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
line, err := c.rw.ReadSlice('\n')
if err != nil {
return c.fatal(err)
}
switch {
case bytes.Equal(line, replyStored):
return nil
case bytes.Equal(line, replyNotStored):
return ErrNotStored
case bytes.Equal(line, replyExists):
return ErrCASConflict
case bytes.Equal(line, replyNotFound):
return ErrNotFound
}
return pkgerr.WithStack(protocolError(string(line)))
}
func (c *conn) Get(key string) (r *Item, err error) {
if !legalKey(key) {
return nil, pkgerr.WithStack(ErrMalformedKey)
}
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if _, err = fmt.Fprintf(c.rw, "gets %s\r\n", key); err != nil {
return nil, c.fatal(err)
}
if err = c.rw.Flush(); err != nil {
return nil, c.fatal(err)
}
if err = c.parseGetReply(func(it *Item) {
r = it
}); err != nil {
return
}
if r == nil {
err = ErrNotFound
return
}
if r.Flags&flagLargeValue != flagLargeValue {
return
}
if r, err = c.getLargeValue(r); err != nil {
return
}
return
}
func (c *conn) GetMulti(keys []string) (res map[string]*Item, err error) {
for _, key := range keys {
if !legalKey(key) {
return nil, pkgerr.WithStack(ErrMalformedKey)
}
}
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if _, err = fmt.Fprintf(c.rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
return nil, c.fatal(err)
}
if err = c.rw.Flush(); err != nil {
return nil, c.fatal(err)
}
res = make(map[string]*Item, len(keys))
if err = c.parseGetReply(func(it *Item) {
res[it.Key] = it
}); err != nil {
return
}
for k, v := range res {
if v.Flags&flagLargeValue != flagLargeValue {
continue
}
r, err := c.getLargeValue(v)
if err != nil {
return res, err
}
res[k] = r
}
return
}
func (c *conn) getMulti(keys []string) (res map[string]*Item, err error) {
for _, key := range keys {
if !legalKey(key) {
return nil, pkgerr.WithStack(ErrMalformedKey)
}
}
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if _, err = fmt.Fprintf(c.rw, "gets %s\r\n", strings.Join(keys, " ")); err != nil {
return nil, c.fatal(err)
}
if err = c.rw.Flush(); err != nil {
return nil, c.fatal(err)
}
res = make(map[string]*Item, len(keys))
err = c.parseGetReply(func(it *Item) {
res[it.Key] = it
})
return
}
func (c *conn) getLargeValue(it *Item) (r *Item, err error) {
l, err := strconv.Atoi(string(it.Value))
if err != nil {
return
}
count := l/_largeValue + 1
keys := make([]string, 0, count)
for i := 1; i <= count; i++ {
keys = append(keys, fmt.Sprintf("%s%d", it.Key, i))
}
items, err := c.getMulti(keys)
if err != nil {
return
}
if len(items) < count {
err = ErrNotFound
return
}
v := make([]byte, 0, l)
for _, k := range keys {
if items[k] == nil || items[k].Value == nil {
err = ErrNotFound
return
}
v = append(v, items[k].Value...)
}
it.Value = v
it.Flags = it.Flags ^ flagLargeValue
r = it
return
}
func (c *conn) parseGetReply(f func(*Item)) error {
if c.readTimeout != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
for {
line, err := c.rw.ReadSlice('\n')
if err != nil {
return c.fatal(err)
}
if bytes.Equal(line, replyEnd) {
return nil
}
if bytes.HasPrefix(line, replyServerErrorPrefix) {
errMsg := line[len(replyServerErrorPrefix):]
return c.fatal(protocolError(errMsg))
}
it := new(Item)
size, err := scanGetReply(line, it)
if err != nil {
return c.fatal(err)
}
it.Value = make([]byte, size+2)
if _, err = io.ReadFull(c.rw, it.Value); err != nil {
return c.fatal(err)
}
if !bytes.HasSuffix(it.Value, crlf) {
return c.fatal(protocolError("corrupt get reply, no except CRLF"))
}
it.Value = it.Value[:size]
f(it)
}
}
func scanGetReply(line []byte, item *Item) (size int, err error) {
if !bytes.HasSuffix(line, crlf) {
return 0, protocolError("corrupt get reply, no except CRLF")
}
// VALUE <key> <flags> <bytes> [<cas unique>]
chunks := strings.Split(string(line[:len(line)-2]), spaceStr)
if len(chunks) < 4 {
return 0, protocolError("corrupt get reply")
}
if chunks[0] != replyValueStr {
return 0, protocolError("corrupt get reply, no except VALUE")
}
item.Key = chunks[1]
flags64, err := strconv.ParseUint(chunks[2], 10, 32)
if err != nil {
return 0, err
}
item.Flags = uint32(flags64)
if size, err = strconv.Atoi(chunks[3]); err != nil {
return
}
if len(chunks) > 4 {
item.cas, err = strconv.ParseUint(chunks[4], 10, 64)
}
return
}
func (c *conn) Touch(key string, expire int32) (err error) {
if !legalKey(key) {
return pkgerr.WithStack(ErrMalformedKey)
}
line, err := c.writeReadLine("touch %s %d\r\n", key, expire)
if err != nil {
return err
}
switch {
case bytes.Equal(line, replyTouched):
return nil
case bytes.Equal(line, replyNotFound):
return ErrNotFound
default:
return pkgerr.WithStack(protocolError(string(line)))
}
}
func (c *conn) Increment(key string, delta uint64) (uint64, error) {
return c.incrDecr("incr", key, delta)
}
func (c *conn) Decrement(key string, delta uint64) (newValue uint64, err error) {
return c.incrDecr("decr", key, delta)
}
func (c *conn) incrDecr(cmd, key string, delta uint64) (uint64, error) {
if !legalKey(key) {
return 0, pkgerr.WithStack(ErrMalformedKey)
}
line, err := c.writeReadLine("%s %s %d\r\n", cmd, key, delta)
if err != nil {
return 0, err
}
switch {
case bytes.Equal(line, replyNotFound):
return 0, ErrNotFound
case bytes.HasPrefix(line, replyClientErrorPrefix):
errMsg := line[len(replyClientErrorPrefix):]
return 0, pkgerr.WithStack(protocolError(errMsg))
}
val, err := strconv.ParseUint(string(line[:len(line)-2]), 10, 64)
if err != nil {
return 0, err
}
return val, nil
}
func (c *conn) Delete(key string) (err error) {
if !legalKey(key) {
return pkgerr.WithStack(ErrMalformedKey)
}
line, err := c.writeReadLine("delete %s\r\n", key)
if err != nil {
return err
}
switch {
case bytes.Equal(line, replyOK):
return nil
case bytes.Equal(line, replyDeleted):
return nil
case bytes.Equal(line, replyNotStored):
return ErrNotStored
case bytes.Equal(line, replyExists):
return ErrCASConflict
case bytes.Equal(line, replyNotFound):
return ErrNotFound
}
return pkgerr.WithStack(protocolError(string(line)))
}
func (c *conn) writeReadLine(format string, args ...interface{}) ([]byte, error) {
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
_, err := fmt.Fprintf(c.rw, format, args...)
if err != nil {
return nil, c.fatal(pkgerr.WithStack(err))
}
if err = c.rw.Flush(); err != nil {
return nil, c.fatal(pkgerr.WithStack(err))
}
if c.readTimeout != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
line, err := c.rw.ReadSlice('\n')
if err != nil {
return line, c.fatal(pkgerr.WithStack(err))
}
return line, nil
}
func (c *conn) Scan(item *Item, v interface{}) (err error) {
c.ir.Reset(item.Value)
if item.Flags&FlagGzip == FlagGzip {
if err = c.gr.Reset(&c.ir); err != nil {
return
}
if err = c.decode(&c.gr, item, v); err != nil {
err = pkgerr.WithStack(err)
return
}
err = c.gr.Close()
} else {
err = c.decode(&c.ir, item, v)
}
err = pkgerr.WithStack(err)
return
}
func (c *conn) WithContext(ctx context.Context) Conn {
// FIXME: implement WithContext
return c
}
func (c *conn) encode(item *Item) (data []byte, err error) {
if (item.Flags | _flagEncoding) == _flagEncoding {
if item.Value == nil {
return nil, ErrItem
}
} else if item.Object == nil {
return nil, ErrItem
}
// encoding
switch {
case item.Flags&FlagGOB == FlagGOB:
c.edb.Reset()
if err = gob.NewEncoder(&c.edb).Encode(item.Object); err != nil {
return
}
data = c.edb.Bytes()
case item.Flags&FlagProtobuf == FlagProtobuf:
c.edb.Reset()
c.ped.SetBuf(c.edb.Bytes())
pb, ok := item.Object.(proto.Message)
if !ok {
err = ErrItemObject
return
}
if err = c.ped.Marshal(pb); err != nil {
return
}
data = c.ped.Bytes()
case item.Flags&FlagJSON == FlagJSON:
c.edb.Reset()
if err = c.je.Encode(item.Object); err != nil {
return
}
data = c.edb.Bytes()
default:
data = item.Value
}
// compress
if item.Flags&FlagGzip == FlagGzip {
c.cb.Reset()
c.gw.Reset(&c.cb)
if _, err = c.gw.Write(data); err != nil {
return
}
if err = c.gw.Close(); err != nil {
return
}
data = c.cb.Bytes()
}
if len(data) > 8000000 {
err = ErrValueSize
}
return
}
func (c *conn) decode(rd io.Reader, item *Item, v interface{}) (err error) {
var data []byte
switch {
case item.Flags&FlagGOB == FlagGOB:
err = gob.NewDecoder(rd).Decode(v)
case item.Flags&FlagJSON == FlagJSON:
c.jr.Reset(rd)
err = c.jd.Decode(v)
default:
data = item.Value
if item.Flags&FlagGzip == FlagGzip {
c.edb.Reset()
if _, err = io.Copy(&c.edb, rd); err != nil {
return
}
data = c.edb.Bytes()
}
if item.Flags&FlagProtobuf == FlagProtobuf {
m, ok := v.(proto.Message)
if !ok {
err = ErrItemObject
return
}
c.ped.SetBuf(data)
err = c.ped.Unmarshal(m)
} else {
switch v.(type) {
case *[]byte:
d := v.(*[]byte)
*d = data
case *string:
d := v.(*string)
*d = string(data)
case interface{}:
err = json.Unmarshal(data, v)
}
}
}
return
}
func legalKey(key string) bool {
if len(key) > 250 || len(key) == 0 {
return false
}
for i := 0; i < len(key); i++ {
if key[i] <= ' ' || key[i] == 0x7f {
return false
}
}
return true
}

@ -0,0 +1,76 @@
package memcache
import (
"errors"
"fmt"
"strings"
pkgerr "github.com/pkg/errors"
)
var (
// ErrNotFound not found
ErrNotFound = errors.New("memcache: key not found")
// ErrExists exists
ErrExists = errors.New("memcache: key exists")
// ErrNotStored not stored
ErrNotStored = errors.New("memcache: key not stored")
// ErrCASConflict means that a CompareAndSwap call failed due to the
// cached value being modified between the Get and the CompareAndSwap.
// If the cached value was simply evicted rather than replaced,
// ErrNotStored will be returned instead.
ErrCASConflict = errors.New("memcache: compare-and-swap conflict")
// ErrPoolExhausted is returned from a pool connection method (Store, Get,
// Delete, IncrDecr, Err) when the maximum number of database connections
// in the pool has been reached.
ErrPoolExhausted = errors.New("memcache: connection pool exhausted")
// ErrPoolClosed pool closed
ErrPoolClosed = errors.New("memcache: connection pool closed")
// ErrConnClosed conn closed
ErrConnClosed = errors.New("memcache: connection closed")
// ErrMalformedKey is returned when an invalid key is used.
// Keys must be at maximum 250 bytes long and not
// contain whitespace or control characters.
ErrMalformedKey = errors.New("memcache: malformed key is too long or contains invalid characters")
// ErrValueSize item value size must less than 1mb
ErrValueSize = errors.New("memcache: item value size must not greater than 1mb")
// ErrStat stat error for monitor
ErrStat = errors.New("memcache unexpected errors")
// ErrItem item nil.
ErrItem = errors.New("memcache: item object nil")
// ErrItemObject object type Assertion failed
ErrItemObject = errors.New("memcache: item object protobuf type assertion failed")
)
type protocolError string
func (pe protocolError) Error() string {
return fmt.Sprintf("memcache: %s (possible server error or unsupported concurrent read by application)", string(pe))
}
func formatErr(err error) string {
e := pkgerr.Cause(err)
switch e {
case ErrNotFound, ErrExists, ErrNotStored, nil:
return ""
default:
es := e.Error()
switch {
case strings.HasPrefix(es, "read"):
return "read timeout"
case strings.HasPrefix(es, "dial"):
return "dial timeout"
case strings.HasPrefix(es, "write"):
return "write timeout"
case strings.Contains(es, "EOF"):
return "eof"
case strings.Contains(es, "reset"):
return "reset"
case strings.Contains(es, "broken"):
return "broken pipe"
default:
return "unexpected err"
}
}
}

@ -0,0 +1,136 @@
package memcache
import (
"context"
)
// Error represents an error returned in a command reply.
type Error string
func (err Error) Error() string { return string(err) }
const (
// Flag, 15(encoding) bit+ 17(compress) bit
// FlagRAW default flag.
FlagRAW = uint32(0)
// FlagGOB gob encoding.
FlagGOB = uint32(1) << 0
// FlagJSON json encoding.
FlagJSON = uint32(1) << 1
// FlagProtobuf protobuf
FlagProtobuf = uint32(1) << 2
_flagEncoding = uint32(0xFFFF8000)
// FlagGzip gzip compress.
FlagGzip = uint32(1) << 15
// left mv 31??? not work!!!
flagLargeValue = uint32(1) << 30
)
// Item is an reply to be got or stored in a memcached server.
type Item struct {
// Key is the Item's key (250 bytes maximum).
Key string
// Value is the Item's value.
Value []byte
// Object is the Item's object for use codec.
Object interface{}
// Flags are server-opaque flags whose semantics are entirely
// up to the app.
Flags uint32
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
Expiration int32
// Compare and swap ID.
cas uint64
}
// Conn represents a connection to a Memcache server.
// Command Reference: https://github.com/memcached/memcached/wiki/Commands
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value if the connection is broken. The returned
// value is either the first non-nil value returned from the underlying
// network connection or a protocol parsing error. Applications should
// close broken connections.
Err() error
// Add writes the given item, if no value already exists for its key.
// ErrNotStored is returned if that condition is not met.
Add(item *Item) error
// Set writes the given item, unconditionally.
Set(item *Item) error
// Replace writes the given item, but only if the server *does* already
// hold data for this key.
Replace(item *Item) error
// Get sends a command to the server for gets data.
Get(key string) (*Item, error)
// GetMulti is a batch version of Get. The returned map from keys to items
// may have fewer elements than the input slice, due to memcache cache
// misses. Each key must be at most 250 bytes in length.
// If no error is returned, the returned map will also be non-nil.
GetMulti(keys []string) (map[string]*Item, error)
// Delete deletes the item with the provided key.
// The error ErrCacheMiss is returned if the item didn't already exist in
// the cache.
Delete(key string) error
// Increment atomically increments key by delta. The return value is the
// new value after being incremented or an error. If the value didn't exist
// in memcached the error is ErrCacheMiss. The value in memcached must be
// an decimal number, or an error will be returned.
// On 64-bit overflow, the new value wraps around.
Increment(key string, delta uint64) (newValue uint64, err error)
// Decrement atomically decrements key by delta. The return value is the
// new value after being decremented or an error. If the value didn't exist
// in memcached the error is ErrCacheMiss. The value in memcached must be
// an decimal number, or an error will be returned. On underflow, the new
// value is capped at zero and does not wrap around.
Decrement(key string, delta uint64) (newValue uint64, err error)
// CompareAndSwap writes the given item that was previously returned by
// Get, if the value was neither modified or evicted between the Get and
// the CompareAndSwap calls. The item's Key should not change between calls
// but all other item fields may differ. ErrCASConflict is returned if the
// value was modified in between the calls.
// ErrNotStored is returned if the value was evicted in between the calls.
CompareAndSwap(item *Item) error
// Touch updates the expiry for the given key. The seconds parameter is
// either a Unix timestamp or, if seconds is less than 1 month, the number
// of seconds into the future at which time the item will expire.
//ErrCacheMiss is returned if the key is not in the cache. The key must be
// at most 250 bytes in length.
Touch(key string, seconds int32) (err error)
// Scan converts value read from the memcache into the following
// common Go types and special types:
//
// *string
// *[]byte
// *interface{}
//
Scan(item *Item, v interface{}) (err error)
// WithContext return a Conn with its context changed to ctx
// the context controls the entire lifetime of Conn before you change it
// NOTE: this method is not thread-safe
WithContext(ctx context.Context) Conn
}

@ -0,0 +1,59 @@
package memcache
import (
"context"
)
// MockErr for unit test.
type MockErr struct {
Error error
}
var _ Conn = MockErr{}
// MockWith return a mock conn.
func MockWith(err error) MockErr {
return MockErr{Error: err}
}
// Err .
func (m MockErr) Err() error { return m.Error }
// Close .
func (m MockErr) Close() error { return m.Error }
// Add .
func (m MockErr) Add(item *Item) error { return m.Error }
// Set .
func (m MockErr) Set(item *Item) error { return m.Error }
// Replace .
func (m MockErr) Replace(item *Item) error { return m.Error }
// CompareAndSwap .
func (m MockErr) CompareAndSwap(item *Item) error { return m.Error }
// Get .
func (m MockErr) Get(key string) (*Item, error) { return nil, m.Error }
// GetMulti .
func (m MockErr) GetMulti(keys []string) (map[string]*Item, error) { return nil, m.Error }
// Touch .
func (m MockErr) Touch(key string, timeout int32) error { return m.Error }
// Delete .
func (m MockErr) Delete(key string) error { return m.Error }
// Increment .
func (m MockErr) Increment(key string, delta uint64) (uint64, error) { return 0, m.Error }
// Decrement .
func (m MockErr) Decrement(key string, delta uint64) (uint64, error) { return 0, m.Error }
// Scan .
func (m MockErr) Scan(item *Item, v interface{}) error { return m.Error }
// WithContext .
func (m MockErr) WithContext(ctx context.Context) Conn { return m }

@ -0,0 +1,197 @@
package memcache
import (
"context"
"io"
"time"
"github.com/bilibili/Kratos/pkg/container/pool"
"github.com/bilibili/Kratos/pkg/stat"
xtime "github.com/bilibili/Kratos/pkg/time"
)
var stats = stat.Cache
// Config memcache config.
type Config struct {
*pool.Config
Name string // memcache name, for trace
Proto string
Addr string
DialTimeout xtime.Duration
ReadTimeout xtime.Duration
WriteTimeout xtime.Duration
}
// Pool memcache connection pool struct.
type Pool struct {
p pool.Pool
c *Config
}
// NewPool new a memcache conn pool.
func NewPool(c *Config) (p *Pool) {
if c.DialTimeout <= 0 || c.ReadTimeout <= 0 || c.WriteTimeout <= 0 {
panic("must config memcache timeout")
}
p1 := pool.NewList(c.Config)
cnop := DialConnectTimeout(time.Duration(c.DialTimeout))
rdop := DialReadTimeout(time.Duration(c.ReadTimeout))
wrop := DialWriteTimeout(time.Duration(c.WriteTimeout))
p1.New = func(ctx context.Context) (io.Closer, error) {
conn, err := Dial(c.Proto, c.Addr, cnop, rdop, wrop)
return &traceConn{Conn: conn, address: c.Addr}, err
}
p = &Pool{p: p1, c: c}
return
}
// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection. If there is an error
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get(ctx context.Context) Conn {
c, err := p.p.Get(ctx)
if err != nil {
return errorConnection{err}
}
c1, _ := c.(Conn)
return &pooledConnection{p: p, c: c1.WithContext(ctx), ctx: ctx}
}
// Close release the resources used by the pool.
func (p *Pool) Close() error {
return p.p.Close()
}
type pooledConnection struct {
p *Pool
c Conn
ctx context.Context
}
func pstat(key string, t time.Time, err error) {
stats.Timing(key, int64(time.Since(t)/time.Millisecond))
if err != nil {
if msg := formatErr(err); msg != "" {
stats.Incr("memcache", msg)
}
}
}
func (pc *pooledConnection) Close() error {
c := pc.c
if _, ok := c.(errorConnection); ok {
return nil
}
pc.c = errorConnection{ErrConnClosed}
pc.p.p.Put(context.Background(), c, c.Err() != nil)
return nil
}
func (pc *pooledConnection) Err() error {
return pc.c.Err()
}
func (pc *pooledConnection) Set(item *Item) (err error) {
now := time.Now()
err = pc.c.Set(item)
pstat("memcache:set", now, err)
return
}
func (pc *pooledConnection) Add(item *Item) (err error) {
now := time.Now()
err = pc.c.Add(item)
pstat("memcache:add", now, err)
return
}
func (pc *pooledConnection) Replace(item *Item) (err error) {
now := time.Now()
err = pc.c.Replace(item)
pstat("memcache:replace", now, err)
return
}
func (pc *pooledConnection) CompareAndSwap(item *Item) (err error) {
now := time.Now()
err = pc.c.CompareAndSwap(item)
pstat("memcache:cas", now, err)
return
}
func (pc *pooledConnection) Get(key string) (r *Item, err error) {
now := time.Now()
r, err = pc.c.Get(key)
pstat("memcache:get", now, err)
return
}
func (pc *pooledConnection) GetMulti(keys []string) (res map[string]*Item, err error) {
// if keys is empty slice returns empty map direct
if len(keys) == 0 {
return make(map[string]*Item), nil
}
now := time.Now()
res, err = pc.c.GetMulti(keys)
pstat("memcache:gets", now, err)
return
}
func (pc *pooledConnection) Touch(key string, timeout int32) (err error) {
now := time.Now()
err = pc.c.Touch(key, timeout)
pstat("memcache:touch", now, err)
return
}
func (pc *pooledConnection) Scan(item *Item, v interface{}) error {
return pc.c.Scan(item, v)
}
func (pc *pooledConnection) WithContext(ctx context.Context) Conn {
// TODO: set context
pc.ctx = ctx
return pc
}
func (pc *pooledConnection) Delete(key string) (err error) {
now := time.Now()
err = pc.c.Delete(key)
pstat("memcache:delete", now, err)
return
}
func (pc *pooledConnection) Increment(key string, delta uint64) (newValue uint64, err error) {
now := time.Now()
newValue, err = pc.c.Increment(key, delta)
pstat("memcache:increment", now, err)
return
}
func (pc *pooledConnection) Decrement(key string, delta uint64) (newValue uint64, err error) {
now := time.Now()
newValue, err = pc.c.Decrement(key, delta)
pstat("memcache:decrement", now, err)
return
}
type errorConnection struct{ err error }
func (ec errorConnection) Err() error { return ec.err }
func (ec errorConnection) Close() error { return ec.err }
func (ec errorConnection) Add(item *Item) error { return ec.err }
func (ec errorConnection) Set(item *Item) error { return ec.err }
func (ec errorConnection) Replace(item *Item) error { return ec.err }
func (ec errorConnection) CompareAndSwap(item *Item) error { return ec.err }
func (ec errorConnection) Get(key string) (*Item, error) { return nil, ec.err }
func (ec errorConnection) GetMulti(keys []string) (map[string]*Item, error) { return nil, ec.err }
func (ec errorConnection) Touch(key string, timeout int32) error { return ec.err }
func (ec errorConnection) Delete(key string) error { return ec.err }
func (ec errorConnection) Increment(key string, delta uint64) (uint64, error) { return 0, ec.err }
func (ec errorConnection) Decrement(key string, delta uint64) (uint64, error) { return 0, ec.err }
func (ec errorConnection) Scan(item *Item, v interface{}) error { return ec.err }
func (ec errorConnection) WithContext(ctx context.Context) Conn { return ec }

@ -0,0 +1,109 @@
package memcache
import (
"context"
"strconv"
"strings"
"time"
"github.com/bilibili/Kratos/pkg/log"
"github.com/bilibili/Kratos/pkg/net/trace"
)
const (
_traceFamily = "memcache"
_traceSpanKind = "client"
_traceComponentName = "library/cache/memcache"
_tracePeerService = "memcache"
_slowLogDuration = time.Millisecond * 250
)
type traceConn struct {
Conn
ctx context.Context
address string
}
func (t *traceConn) setTrace(action, statement string) func(error) error {
now := time.Now()
parent, ok := trace.FromContext(t.ctx)
if !ok {
return func(err error) error { return err }
}
span := parent.Fork(_traceFamily, "Memcache:"+action)
span.SetTag(
trace.String(trace.TagSpanKind, _traceSpanKind),
trace.String(trace.TagComponent, _traceComponentName),
trace.String(trace.TagPeerService, _tracePeerService),
trace.String(trace.TagPeerAddress, t.address),
trace.String(trace.TagDBStatement, action+" "+statement),
)
return func(err error) error {
span.Finish(&err)
t := time.Since(now)
if t > _slowLogDuration {
log.Warn("%s slow log action: %s key: %s time: %v", _traceFamily, action, statement, t)
}
return err
}
}
func (t *traceConn) WithContext(ctx context.Context) Conn {
t.ctx = ctx
t.Conn = t.Conn.WithContext(ctx)
return t
}
func (t *traceConn) Add(item *Item) error {
finishFn := t.setTrace("Add", item.Key)
return finishFn(t.Conn.Add(item))
}
func (t *traceConn) Set(item *Item) error {
finishFn := t.setTrace("Set", item.Key)
return finishFn(t.Conn.Set(item))
}
func (t *traceConn) Replace(item *Item) error {
finishFn := t.setTrace("Replace", item.Key)
return finishFn(t.Conn.Replace(item))
}
func (t *traceConn) Get(key string) (*Item, error) {
finishFn := t.setTrace("Get", key)
item, err := t.Conn.Get(key)
return item, finishFn(err)
}
func (t *traceConn) GetMulti(keys []string) (map[string]*Item, error) {
finishFn := t.setTrace("GetMulti", strings.Join(keys, " "))
items, err := t.Conn.GetMulti(keys)
return items, finishFn(err)
}
func (t *traceConn) Delete(key string) error {
finishFn := t.setTrace("Delete", key)
return finishFn(t.Conn.Delete(key))
}
func (t *traceConn) Increment(key string, delta uint64) (newValue uint64, err error) {
finishFn := t.setTrace("Increment", key+" "+strconv.FormatUint(delta, 10))
newValue, err = t.Conn.Increment(key, delta)
return newValue, finishFn(err)
}
func (t *traceConn) Decrement(key string, delta uint64) (newValue uint64, err error) {
finishFn := t.setTrace("Decrement", key+" "+strconv.FormatUint(delta, 10))
newValue, err = t.Conn.Decrement(key, delta)
return newValue, finishFn(err)
}
func (t *traceConn) CompareAndSwap(item *Item) error {
finishFn := t.setTrace("CompareAndSwap", item.Key)
return finishFn(t.Conn.CompareAndSwap(item))
}
func (t *traceConn) Touch(key string, seconds int32) (err error) {
finishFn := t.setTrace("Touch", key+" "+strconv.Itoa(int(seconds)))
return finishFn(t.Conn.Touch(key, seconds))
}

@ -0,0 +1,32 @@
package memcache
import (
"github.com/gogo/protobuf/proto"
)
// RawItem item with FlagRAW flag.
//
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
func RawItem(key string, data []byte, flags uint32, expiration int32) *Item {
return &Item{Key: key, Flags: flags | FlagRAW, Value: data, Expiration: expiration}
}
// JSONItem item with FlagJSON flag.
//
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
func JSONItem(key string, v interface{}, flags uint32, expiration int32) *Item {
return &Item{Key: key, Flags: flags | FlagJSON, Object: v, Expiration: expiration}
}
// ProtobufItem item with FlagProtobuf flag.
//
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
func ProtobufItem(key string, message proto.Message, flags uint32, expiration int32) *Item {
return &Item{Key: key, Flags: flags | FlagProtobuf, Object: message, Expiration: expiration}
}

@ -0,0 +1,49 @@
Redigo
======
[![Build Status](https://travis-ci.org/garyburd/redigo.svg?branch=master)](https://travis-ci.org/garyburd/redigo)
Redigo is a [Go](http://golang.org/) client for the [Redis](http://redis.io/) database.
Features
-------
* A [Print-like](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Executing_Commands) API with support for all Redis commands.
* [Pipelining](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Pipelining), including pipelined transactions.
* [Publish/Subscribe](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Publish_and_Subscribe).
* [Connection pooling](http://godoc.org/github.com/garyburd/redigo/redis#Pool).
* [Script helper type](http://godoc.org/github.com/garyburd/redigo/redis#Script) with optimistic use of EVALSHA.
* [Helper functions](http://godoc.org/github.com/garyburd/redigo/redis#hdr-Reply_Helpers) for working with command replies.
Documentation
-------------
- [API Reference](http://godoc.org/github.com/garyburd/redigo/redis)
- [FAQ](https://github.com/garyburd/redigo/wiki/FAQ)
Installation
------------
Install Redigo using the "go get" command:
go get github.com/garyburd/redigo/redis
The Go distribution is Redigo's only dependency.
Related Projects
----------------
- [rafaeljusto/redigomock](https://godoc.org/github.com/rafaeljusto/redigomock) - A mock library for Redigo.
- [chasex/redis-go-cluster](https://github.com/chasex/redis-go-cluster) - A Redis cluster client implementation.
Contributing
------------
Gary is looking for someone to take over maintenance of this project. If you are interested, contact Gary at the email address listed on his GitHub profile page.
PRs for major features will not be accepted until a new maintainer is found. Bug reports and PRs for bug fixes are welcome.
License
-------
Redigo is available under the [Apache License, Version 2.0](http://www.apache.org/licenses/LICENSE-2.0.html).

@ -0,0 +1,57 @@
// Copyright 2014 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"strings"
)
// redis state
const (
WatchState = 1 << iota
MultiState
SubscribeState
MonitorState
)
// CommandInfo command info.
type CommandInfo struct {
Set, Clear int
}
var commandInfos = map[string]CommandInfo{
"WATCH": {Set: WatchState},
"UNWATCH": {Clear: WatchState},
"MULTI": {Set: MultiState},
"EXEC": {Clear: WatchState | MultiState},
"DISCARD": {Clear: WatchState | MultiState},
"PSUBSCRIBE": {Set: SubscribeState},
"SUBSCRIBE": {Set: SubscribeState},
"MONITOR": {Set: MonitorState},
}
func init() {
for n, ci := range commandInfos {
commandInfos[strings.ToLower(n)] = ci
}
}
// LookupCommandInfo get command info.
func LookupCommandInfo(commandName string) CommandInfo {
if ci, ok := commandInfos[commandName]; ok {
return ci
}
return commandInfos[strings.ToUpper(commandName)]
}

@ -0,0 +1,597 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bufio"
"bytes"
"context"
"fmt"
"io"
"net"
"net/url"
"regexp"
"strconv"
"sync"
"time"
"github.com/bilibili/Kratos/pkg/stat"
"github.com/pkg/errors"
)
var stats = stat.Cache
// conn is the low-level implementation of Conn
type conn struct {
// Shared
mu sync.Mutex
pending int
err error
conn net.Conn
// Read
readTimeout time.Duration
br *bufio.Reader
// Write
writeTimeout time.Duration
bw *bufio.Writer
// Scratch space for formatting argument length.
// '*' or '$', length, "\r\n"
lenScratch [32]byte
// Scratch space for formatting integers and floats.
numScratch [40]byte
// stat func,default prom
stat func(string, *error) func()
}
func statfunc(cmd string, err *error) func() {
now := time.Now()
return func() {
stats.Timing(fmt.Sprintf("redis:%s", cmd), int64(time.Since(now)/time.Millisecond))
if err != nil {
if msg := formatErr(*err); msg != "" {
stats.Incr("redis", msg)
}
}
}
}
// DialTimeout acts like Dial but takes timeouts for establishing the
// connection to the server, writing a command and reading a reply.
//
// Deprecated: Use Dial with options instead.
func DialTimeout(network, address string, connectTimeout, readTimeout, writeTimeout time.Duration) (Conn, error) {
return Dial(network, address,
DialConnectTimeout(connectTimeout),
DialReadTimeout(readTimeout),
DialWriteTimeout(writeTimeout))
}
// DialOption specifies an option for dialing a Redis server.
type DialOption struct {
f func(*dialOptions)
}
type dialOptions struct {
readTimeout time.Duration
writeTimeout time.Duration
dial func(network, addr string) (net.Conn, error)
db int
password string
stat func(string, *error) func()
}
// DialStats specifies stat func for stats.default statfunc.
func DialStats(fn func(string, *error) func()) DialOption {
return DialOption{func(do *dialOptions) {
do.stat = fn
}}
}
// DialReadTimeout specifies the timeout for reading a single command reply.
func DialReadTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
do.readTimeout = d
}}
}
// DialWriteTimeout specifies the timeout for writing a single command.
func DialWriteTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
do.writeTimeout = d
}}
}
// DialConnectTimeout specifies the timeout for connecting to the Redis server.
func DialConnectTimeout(d time.Duration) DialOption {
return DialOption{func(do *dialOptions) {
dialer := net.Dialer{Timeout: d}
do.dial = dialer.Dial
}}
}
// DialNetDial specifies a custom dial function for creating TCP
// connections. If this option is left out, then net.Dial is
// used. DialNetDial overrides DialConnectTimeout.
func DialNetDial(dial func(network, addr string) (net.Conn, error)) DialOption {
return DialOption{func(do *dialOptions) {
do.dial = dial
}}
}
// DialDatabase specifies the database to select when dialing a connection.
func DialDatabase(db int) DialOption {
return DialOption{func(do *dialOptions) {
do.db = db
}}
}
// DialPassword specifies the password to use when connecting to
// the Redis server.
func DialPassword(password string) DialOption {
return DialOption{func(do *dialOptions) {
do.password = password
}}
}
// Dial connects to the Redis server at the given network and
// address using the specified options.
func Dial(network, address string, options ...DialOption) (Conn, error) {
do := dialOptions{
dial: net.Dial,
}
for _, option := range options {
option.f(&do)
}
netConn, err := do.dial(network, address)
if err != nil {
return nil, errors.WithStack(err)
}
c := &conn{
conn: netConn,
bw: bufio.NewWriter(netConn),
br: bufio.NewReader(netConn),
readTimeout: do.readTimeout,
writeTimeout: do.writeTimeout,
stat: statfunc,
}
if do.password != "" {
if _, err := c.Do("AUTH", do.password); err != nil {
netConn.Close()
return nil, errors.WithStack(err)
}
}
if do.db != 0 {
if _, err := c.Do("SELECT", do.db); err != nil {
netConn.Close()
return nil, errors.WithStack(err)
}
}
if do.stat != nil {
c.stat = do.stat
}
return c, nil
}
var pathDBRegexp = regexp.MustCompile(`/(\d+)\z`)
// DialURL connects to a Redis server at the given URL using the Redis
// URI scheme. URLs should follow the draft IANA specification for the
// scheme (https://www.iana.org/assignments/uri-schemes/prov/redis).
func DialURL(rawurl string, options ...DialOption) (Conn, error) {
u, err := url.Parse(rawurl)
if err != nil {
return nil, errors.WithStack(err)
}
if u.Scheme != "redis" {
return nil, fmt.Errorf("invalid redis URL scheme: %s", u.Scheme)
}
// As per the IANA draft spec, the host defaults to localhost and
// the port defaults to 6379.
host, port, err := net.SplitHostPort(u.Host)
if err != nil {
// assume port is missing
host = u.Host
port = "6379"
}
if host == "" {
host = "localhost"
}
address := net.JoinHostPort(host, port)
if u.User != nil {
password, isSet := u.User.Password()
if isSet {
options = append(options, DialPassword(password))
}
}
match := pathDBRegexp.FindStringSubmatch(u.Path)
if len(match) == 2 {
db, err := strconv.Atoi(match[1])
if err != nil {
return nil, errors.Errorf("invalid database: %s", u.Path[1:])
}
if db != 0 {
options = append(options, DialDatabase(db))
}
} else if u.Path != "" {
return nil, errors.Errorf("invalid database: %s", u.Path[1:])
}
return Dial("tcp", address, options...)
}
// NewConn new a redis conn.
func NewConn(c *Config) (cn Conn, err error) {
cnop := DialConnectTimeout(time.Duration(c.DialTimeout))
rdop := DialReadTimeout(time.Duration(c.ReadTimeout))
wrop := DialWriteTimeout(time.Duration(c.WriteTimeout))
auop := DialPassword(c.Auth)
// new conn
cn, err = Dial(c.Proto, c.Addr, cnop, rdop, wrop, auop)
return
}
func (c *conn) Close() error {
c.mu.Lock()
err := c.err
if c.err == nil {
c.err = errors.New("redigo: closed")
err = c.conn.Close()
}
c.mu.Unlock()
return err
}
func (c *conn) fatal(err error) error {
c.mu.Lock()
if c.err == nil {
c.err = err
// Close connection to force errors on subsequent calls and to unblock
// other reader or writer.
c.conn.Close()
}
c.mu.Unlock()
return errors.WithStack(c.err)
}
func (c *conn) Err() error {
c.mu.Lock()
err := c.err
c.mu.Unlock()
return err
}
func (c *conn) writeLen(prefix byte, n int) error {
c.lenScratch[len(c.lenScratch)-1] = '\n'
c.lenScratch[len(c.lenScratch)-2] = '\r'
i := len(c.lenScratch) - 3
for {
c.lenScratch[i] = byte('0' + n%10)
i--
n = n / 10
if n == 0 {
break
}
}
c.lenScratch[i] = prefix
_, err := c.bw.Write(c.lenScratch[i:])
return errors.WithStack(err)
}
func (c *conn) writeString(s string) error {
c.writeLen('$', len(s))
c.bw.WriteString(s)
_, err := c.bw.WriteString("\r\n")
return errors.WithStack(err)
}
func (c *conn) writeBytes(p []byte) error {
c.writeLen('$', len(p))
c.bw.Write(p)
_, err := c.bw.WriteString("\r\n")
return errors.WithStack(err)
}
func (c *conn) writeInt64(n int64) error {
return errors.WithStack(c.writeBytes(strconv.AppendInt(c.numScratch[:0], n, 10)))
}
func (c *conn) writeFloat64(n float64) error {
return errors.WithStack(c.writeBytes(strconv.AppendFloat(c.numScratch[:0], n, 'g', -1, 64)))
}
func (c *conn) writeCommand(cmd string, args []interface{}) (err error) {
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
c.writeLen('*', 1+len(args))
err = c.writeString(cmd)
for _, arg := range args {
if err != nil {
break
}
switch arg := arg.(type) {
case string:
err = c.writeString(arg)
case []byte:
err = c.writeBytes(arg)
case int:
err = c.writeInt64(int64(arg))
case int64:
err = c.writeInt64(arg)
case float64:
err = c.writeFloat64(arg)
case bool:
if arg {
err = c.writeString("1")
} else {
err = c.writeString("0")
}
case nil:
err = c.writeString("")
default:
var buf bytes.Buffer
fmt.Fprint(&buf, arg)
err = errors.WithStack(c.writeBytes(buf.Bytes()))
}
}
return err
}
type protocolError string
func (pe protocolError) Error() string {
return fmt.Sprintf("redigo: %s (possible server error or unsupported concurrent read by application)", string(pe))
}
func (c *conn) readLine() ([]byte, error) {
p, err := c.br.ReadSlice('\n')
if err == bufio.ErrBufferFull {
return nil, errors.WithStack(protocolError("long response line"))
}
if err != nil {
return nil, err
}
i := len(p) - 2
if i < 0 || p[i] != '\r' {
return nil, errors.WithStack(protocolError("bad response line terminator"))
}
return p[:i], nil
}
// parseLen parses bulk string and array lengths.
func parseLen(p []byte) (int, error) {
if len(p) == 0 {
return -1, errors.WithStack(protocolError("malformed length"))
}
if p[0] == '-' && len(p) == 2 && p[1] == '1' {
// handle $-1 and $-1 null replies.
return -1, nil
}
var n int
for _, b := range p {
n *= 10
if b < '0' || b > '9' {
return -1, errors.WithStack(protocolError("illegal bytes in length"))
}
n += int(b - '0')
}
return n, nil
}
// parseInt parses an integer reply.
func parseInt(p []byte) (interface{}, error) {
if len(p) == 0 {
return 0, errors.WithStack(protocolError("malformed integer"))
}
var negate bool
if p[0] == '-' {
negate = true
p = p[1:]
if len(p) == 0 {
return 0, errors.WithStack(protocolError("malformed integer"))
}
}
var n int64
for _, b := range p {
n *= 10
if b < '0' || b > '9' {
return 0, errors.WithStack(protocolError("illegal bytes in length"))
}
n += int64(b - '0')
}
if negate {
n = -n
}
return n, nil
}
var (
okReply interface{} = "OK"
pongReply interface{} = "PONG"
)
func (c *conn) readReply() (interface{}, error) {
line, err := c.readLine()
if err != nil {
return nil, err
}
if len(line) == 0 {
return nil, errors.WithStack(protocolError("short response line"))
}
switch line[0] {
case '+':
switch {
case len(line) == 3 && line[1] == 'O' && line[2] == 'K':
// Avoid allocation for frequent "+OK" response.
return okReply, nil
case len(line) == 5 && line[1] == 'P' && line[2] == 'O' && line[3] == 'N' && line[4] == 'G':
// Avoid allocation in PING command benchmarks :)
return pongReply, nil
default:
return string(line[1:]), nil
}
case '-':
return Error(string(line[1:])), nil
case ':':
return parseInt(line[1:])
case '$':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
p := make([]byte, n)
_, err = io.ReadFull(c.br, p)
if err != nil {
return nil, errors.WithStack(err)
}
if line1, err := c.readLine(); err != nil {
return nil, err
} else if len(line1) != 0 {
return nil, errors.WithStack(protocolError("bad bulk string format"))
}
return p, nil
case '*':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
r := make([]interface{}, n)
for i := range r {
r[i], err = c.readReply()
if err != nil {
return nil, err
}
}
return r, nil
}
return nil, errors.WithStack(protocolError("unexpected response line"))
}
func (c *conn) Send(cmd string, args ...interface{}) (err error) {
c.mu.Lock()
c.pending++
c.mu.Unlock()
if err = c.writeCommand(cmd, args); err != nil {
c.fatal(err)
}
return err
}
func (c *conn) Flush() (err error) {
if c.writeTimeout != 0 {
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
}
if err = c.bw.Flush(); err != nil {
c.fatal(err)
}
return err
}
func (c *conn) Receive() (reply interface{}, err error) {
if c.readTimeout != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
if reply, err = c.readReply(); err != nil {
return nil, c.fatal(err)
}
// When using pub/sub, the number of receives can be greater than the
// number of sends. To enable normal use of the connection after
// unsubscribing from all channels, we do not decrement pending to a
// negative value.
//
// The pending field is decremented after the reply is read to handle the
// case where Receive is called before Send.
c.mu.Lock()
if c.pending > 0 {
c.pending--
}
c.mu.Unlock()
if err, ok := reply.(Error); ok {
return nil, err
}
return
}
func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
c.mu.Lock()
pending := c.pending
c.pending = 0
c.mu.Unlock()
if cmd == "" && pending == 0 {
return nil, nil
}
var err error
defer c.stat(cmd, &err)()
if cmd != "" {
err = c.writeCommand(cmd, args)
}
if err == nil {
err = errors.WithStack(c.bw.Flush())
}
if err != nil {
return nil, c.fatal(err)
}
if c.readTimeout != 0 {
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
}
if cmd == "" {
reply := make([]interface{}, pending)
for i := range reply {
var r interface{}
r, err = c.readReply()
if err != nil {
break
}
reply[i] = r
}
if err != nil {
return nil, c.fatal(err)
}
return reply, nil
}
var reply interface{}
for i := 0; i <= pending; i++ {
var e error
if reply, e = c.readReply(); e != nil {
return nil, c.fatal(e)
}
if e, ok := reply.(Error); ok && err == nil {
err = e
}
}
return reply, err
}
// WithContext FIXME: implement WithContext
func (c *conn) WithContext(ctx context.Context) Conn { return c }

@ -0,0 +1,169 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
// Package redis is a client for the Redis database.
//
// The Redigo FAQ (https://github.com/garyburd/redigo/wiki/FAQ) contains more
// documentation about this package.
//
// Connections
//
// The Conn interface is the primary interface for working with Redis.
// Applications create connections by calling the Dial, DialWithTimeout or
// NewConn functions. In the future, functions will be added for creating
// sharded and other types of connections.
//
// The application must call the connection Close method when the application
// is done with the connection.
//
// Executing Commands
//
// The Conn interface has a generic method for executing Redis commands:
//
// Do(commandName string, args ...interface{}) (reply interface{}, err error)
//
// The Redis command reference (http://redis.io/commands) lists the available
// commands. An example of using the Redis APPEND command is:
//
// n, err := conn.Do("APPEND", "key", "value")
//
// The Do method converts command arguments to binary strings for transmission
// to the server as follows:
//
// Go Type Conversion
// []byte Sent as is
// string Sent as is
// int, int64 strconv.FormatInt(v)
// float64 strconv.FormatFloat(v, 'g', -1, 64)
// bool true -> "1", false -> "0"
// nil ""
// all other types fmt.Print(v)
//
// Redis command reply types are represented using the following Go types:
//
// Redis type Go type
// error redis.Error
// integer int64
// simple string string
// bulk string []byte or nil if value not present.
// array []interface{} or nil if value not present.
//
// Use type assertions or the reply helper functions to convert from
// interface{} to the specific Go type for the command result.
//
// Pipelining
//
// Connections support pipelining using the Send, Flush and Receive methods.
//
// Send(commandName string, args ...interface{}) error
// Flush() error
// Receive() (reply interface{}, err error)
//
// Send writes the command to the connection's output buffer. Flush flushes the
// connection's output buffer to the server. Receive reads a single reply from
// the server. The following example shows a simple pipeline.
//
// c.Send("SET", "foo", "bar")
// c.Send("GET", "foo")
// c.Flush()
// c.Receive() // reply from SET
// v, err = c.Receive() // reply from GET
//
// The Do method combines the functionality of the Send, Flush and Receive
// methods. The Do method starts by writing the command and flushing the output
// buffer. Next, the Do method receives all pending replies including the reply
// for the command just sent by Do. If any of the received replies is an error,
// then Do returns the error. If there are no errors, then Do returns the last
// reply. If the command argument to the Do method is "", then the Do method
// will flush the output buffer and receive pending replies without sending a
// command.
//
// Use the Send and Do methods to implement pipelined transactions.
//
// c.Send("MULTI")
// c.Send("INCR", "foo")
// c.Send("INCR", "bar")
// r, err := c.Do("EXEC")
// fmt.Println(r) // prints [1, 1]
//
// Concurrency
//
// Connections do not support concurrent calls to the write methods (Send,
// Flush) or concurrent calls to the read method (Receive). Connections do
// allow a concurrent reader and writer.
//
// Because the Do method combines the functionality of Send, Flush and Receive,
// the Do method cannot be called concurrently with the other methods.
//
// For full concurrent access to Redis, use the thread-safe Pool to get and
// release connections from within a goroutine.
//
// Publish and Subscribe
//
// Use the Send, Flush and Receive methods to implement Pub/Sub subscribers.
//
// c.Send("SUBSCRIBE", "example")
// c.Flush()
// for {
// reply, err := c.Receive()
// if err != nil {
// return err
// }
// // process pushed message
// }
//
// The PubSubConn type wraps a Conn with convenience methods for implementing
// subscribers. The Subscribe, PSubscribe, Unsubscribe and PUnsubscribe methods
// send and flush a subscription management command. The receive method
// converts a pushed message to convenient types for use in a type switch.
//
// psc := redis.PubSubConn{c}
// psc.Subscribe("example")
// for {
// switch v := psc.Receive().(type) {
// case redis.Message:
// fmt.Printf("%s: message: %s\n", v.Channel, v.Data)
// case redis.Subscription:
// fmt.Printf("%s: %s %d\n", v.Channel, v.Kind, v.Count)
// case error:
// return v
// }
// }
//
// Reply Helpers
//
// The Bool, Int, Bytes, String, Strings and Values functions convert a reply
// to a value of a specific type. To allow convenient wrapping of calls to the
// connection Do and Receive methods, the functions take a second argument of
// type error. If the error is non-nil, then the helper function returns the
// error. If the error is nil, the function converts the reply to the specified
// type:
//
// exists, err := redis.Bool(c.Do("EXISTS", "foo"))
// if err != nil {
// // handle error return from c.Do or type conversion error.
// }
//
// The Scan function converts elements of a array reply to Go types:
//
// var value1 int
// var value2 string
// reply, err := redis.Values(c.Do("MGET", "key1", "key2"))
// if err != nil {
// // handle error
// }
// if _, err := redis.Scan(reply, &value1, &value2); err != nil {
// // handle error
// }
package redis

@ -0,0 +1,33 @@
package redis
import (
"strings"
pkgerr "github.com/pkg/errors"
)
func formatErr(err error) string {
e := pkgerr.Cause(err)
switch e {
case ErrNil, nil:
return ""
default:
es := e.Error()
switch {
case strings.HasPrefix(es, "read"):
return "read timeout"
case strings.HasPrefix(es, "dial"):
return "dial timeout"
case strings.HasPrefix(es, "write"):
return "write timeout"
case strings.Contains(es, "EOF"):
return "eof"
case strings.Contains(es, "reset"):
return "reset"
case strings.Contains(es, "broken"):
return "broken pipe"
default:
return "unexpected err"
}
}
}

@ -0,0 +1,117 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bytes"
"fmt"
"log"
)
// NewLoggingConn returns a logging wrapper around a connection.
func NewLoggingConn(conn Conn, logger *log.Logger, prefix string) Conn {
if prefix != "" {
prefix = prefix + "."
}
return &loggingConn{conn, logger, prefix}
}
type loggingConn struct {
Conn
logger *log.Logger
prefix string
}
func (c *loggingConn) Close() error {
err := c.Conn.Close()
var buf bytes.Buffer
fmt.Fprintf(&buf, "%sClose() -> (%v)", c.prefix, err)
c.logger.Output(2, buf.String())
return err
}
func (c *loggingConn) printValue(buf *bytes.Buffer, v interface{}) {
const chop = 32
switch v := v.(type) {
case []byte:
if len(v) > chop {
fmt.Fprintf(buf, "%q...", v[:chop])
} else {
fmt.Fprintf(buf, "%q", v)
}
case string:
if len(v) > chop {
fmt.Fprintf(buf, "%q...", v[:chop])
} else {
fmt.Fprintf(buf, "%q", v)
}
case []interface{}:
if len(v) == 0 {
buf.WriteString("[]")
} else {
sep := "["
fin := "]"
if len(v) > chop {
v = v[:chop]
fin = "...]"
}
for _, vv := range v {
buf.WriteString(sep)
c.printValue(buf, vv)
sep = ", "
}
buf.WriteString(fin)
}
default:
fmt.Fprint(buf, v)
}
}
func (c *loggingConn) print(method, commandName string, args []interface{}, reply interface{}, err error) {
var buf bytes.Buffer
fmt.Fprintf(&buf, "%s%s(", c.prefix, method)
if method != "Receive" {
buf.WriteString(commandName)
for _, arg := range args {
buf.WriteString(", ")
c.printValue(&buf, arg)
}
}
buf.WriteString(") -> (")
if method != "Send" {
c.printValue(&buf, reply)
buf.WriteString(", ")
}
fmt.Fprintf(&buf, "%v)", err)
c.logger.Output(3, buf.String())
}
func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{}, error) {
reply, err := c.Conn.Do(commandName, args...)
c.print("Do", commandName, args, reply, err)
return reply, err
}
func (c *loggingConn) Send(commandName string, args ...interface{}) error {
err := c.Conn.Send(commandName, args...)
c.print("Send", commandName, args, nil, err)
return err
}
func (c *loggingConn) Receive() (interface{}, error) {
reply, err := c.Conn.Receive()
c.print("Receive", "", nil, reply, err)
return reply, err
}

@ -0,0 +1,36 @@
package redis
import (
"context"
)
// MockErr for unit test.
type MockErr struct {
Error error
}
// MockWith return a mock conn.
func MockWith(err error) MockErr {
return MockErr{Error: err}
}
// Err .
func (m MockErr) Err() error { return m.Error }
// Close .
func (m MockErr) Close() error { return m.Error }
// Do .
func (m MockErr) Do(commandName string, args ...interface{}) (interface{}, error) { return nil, m.Error }
// Send .
func (m MockErr) Send(commandName string, args ...interface{}) error { return m.Error }
// Flush .
func (m MockErr) Flush() error { return m.Error }
// Receive .
func (m MockErr) Receive() (interface{}, error) { return nil, m.Error }
// WithContext .
func (m MockErr) WithContext(context.Context) Conn { return m }

@ -0,0 +1,218 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"bytes"
"context"
"crypto/rand"
"crypto/sha1"
"errors"
"io"
"strconv"
"sync"
"time"
"github.com/bilibili/Kratos/pkg/container/pool"
"github.com/bilibili/Kratos/pkg/net/trace"
xtime "github.com/bilibili/Kratos/pkg/time"
)
var beginTime, _ = time.Parse("2006-01-02 15:04:05", "2006-01-02 15:04:05")
var (
errConnClosed = errors.New("redigo: connection closed")
)
// Pool .
type Pool struct {
*pool.Slice
// config
c *Config
}
// Config client settings.
type Config struct {
*pool.Config
Name string // redis name, for trace
Proto string
Addr string
Auth string
DialTimeout xtime.Duration
ReadTimeout xtime.Duration
WriteTimeout xtime.Duration
}
// NewPool creates a new pool.
func NewPool(c *Config, options ...DialOption) (p *Pool) {
if c.DialTimeout <= 0 || c.ReadTimeout <= 0 || c.WriteTimeout <= 0 {
panic("must config redis timeout")
}
p1 := pool.NewSlice(c.Config)
cnop := DialConnectTimeout(time.Duration(c.DialTimeout))
options = append(options, cnop)
rdop := DialReadTimeout(time.Duration(c.ReadTimeout))
options = append(options, rdop)
wrop := DialWriteTimeout(time.Duration(c.WriteTimeout))
options = append(options, wrop)
auop := DialPassword(c.Auth)
options = append(options, auop)
// new pool
p1.New = func(ctx context.Context) (io.Closer, error) {
conn, err := Dial(c.Proto, c.Addr, options...)
if err != nil {
return nil, err
}
return &traceConn{Conn: conn, connTags: []trace.Tag{trace.TagString(trace.TagPeerAddress, c.Addr)}}, nil
}
p = &Pool{Slice: p1, c: c}
return
}
// Get gets a connection. The application must close the returned connection.
// This method always returns a valid connection so that applications can defer
// error handling to the first use of the connection. If there is an error
// getting an underlying connection, then the connection Err, Do, Send, Flush
// and Receive methods return that error.
func (p *Pool) Get(ctx context.Context) Conn {
c, err := p.Slice.Get(ctx)
if err != nil {
return errorConnection{err}
}
c1, _ := c.(Conn)
return &pooledConnection{p: p, c: c1.WithContext(ctx), ctx: ctx, now: beginTime}
}
// Close releases the resources used by the pool.
func (p *Pool) Close() error {
return p.Slice.Close()
}
type pooledConnection struct {
p *Pool
c Conn
state int
now time.Time
cmds []string
ctx context.Context
}
var (
sentinel []byte
sentinelOnce sync.Once
)
func initSentinel() {
p := make([]byte, 64)
if _, err := rand.Read(p); err == nil {
sentinel = p
} else {
h := sha1.New()
io.WriteString(h, "Oops, rand failed. Use time instead.")
io.WriteString(h, strconv.FormatInt(time.Now().UnixNano(), 10))
sentinel = h.Sum(nil)
}
}
func (pc *pooledConnection) Close() error {
c := pc.c
if _, ok := c.(errorConnection); ok {
return nil
}
pc.c = errorConnection{errConnClosed}
if pc.state&MultiState != 0 {
c.Send("DISCARD")
pc.state &^= (MultiState | WatchState)
} else if pc.state&WatchState != 0 {
c.Send("UNWATCH")
pc.state &^= WatchState
}
if pc.state&SubscribeState != 0 {
c.Send("UNSUBSCRIBE")
c.Send("PUNSUBSCRIBE")
// To detect the end of the message stream, ask the server to echo
// a sentinel value and read until we see that value.
sentinelOnce.Do(initSentinel)
c.Send("ECHO", sentinel)
c.Flush()
for {
p, err := c.Receive()
if err != nil {
break
}
if p, ok := p.([]byte); ok && bytes.Equal(p, sentinel) {
pc.state &^= SubscribeState
break
}
}
}
_, err := c.Do("")
pc.p.Slice.Put(context.Background(), c, pc.state != 0 || c.Err() != nil)
return err
}
func (pc *pooledConnection) Err() error {
return pc.c.Err()
}
func (pc *pooledConnection) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
ci := LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear
reply, err = pc.c.Do(commandName, args...)
return
}
func (pc *pooledConnection) Send(commandName string, args ...interface{}) (err error) {
ci := LookupCommandInfo(commandName)
pc.state = (pc.state | ci.Set) &^ ci.Clear
if pc.now.Equal(beginTime) {
// mark first send time
pc.now = time.Now()
}
pc.cmds = append(pc.cmds, commandName)
return pc.c.Send(commandName, args...)
}
func (pc *pooledConnection) Flush() error {
return pc.c.Flush()
}
func (pc *pooledConnection) Receive() (reply interface{}, err error) {
reply, err = pc.c.Receive()
if len(pc.cmds) > 0 {
pc.cmds = pc.cmds[1:]
}
return
}
func (pc *pooledConnection) WithContext(ctx context.Context) Conn {
pc.ctx = ctx
return pc
}
type errorConnection struct{ err error }
func (ec errorConnection) Do(string, ...interface{}) (interface{}, error) {
return nil, ec.err
}
func (ec errorConnection) Send(string, ...interface{}) error { return ec.err }
func (ec errorConnection) Err() error { return ec.err }
func (ec errorConnection) Close() error { return ec.err }
func (ec errorConnection) Flush() error { return ec.err }
func (ec errorConnection) Receive() (interface{}, error) { return nil, ec.err }
func (ec errorConnection) WithContext(context.Context) Conn { return ec }

@ -0,0 +1,152 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
pkgerr "github.com/pkg/errors"
)
var (
errPubSub = errors.New("redigo: unknown pubsub notification")
)
// Subscription represents a subscribe or unsubscribe notification.
type Subscription struct {
// Kind is "subscribe", "unsubscribe", "psubscribe" or "punsubscribe"
Kind string
// The channel that was changed.
Channel string
// The current number of subscriptions for connection.
Count int
}
// Message represents a message notification.
type Message struct {
// The originating channel.
Channel string
// The message data.
Data []byte
}
// PMessage represents a pmessage notification.
type PMessage struct {
// The matched pattern.
Pattern string
// The originating channel.
Channel string
// The message data.
Data []byte
}
// Pong represents a pubsub pong notification.
type Pong struct {
Data string
}
// PubSubConn wraps a Conn with convenience methods for subscribers.
type PubSubConn struct {
Conn Conn
}
// Close closes the connection.
func (c PubSubConn) Close() error {
return c.Conn.Close()
}
// Subscribe subscribes the connection to the specified channels.
func (c PubSubConn) Subscribe(channel ...interface{}) error {
c.Conn.Send("SUBSCRIBE", channel...)
return c.Conn.Flush()
}
// PSubscribe subscribes the connection to the given patterns.
func (c PubSubConn) PSubscribe(channel ...interface{}) error {
c.Conn.Send("PSUBSCRIBE", channel...)
return c.Conn.Flush()
}
// Unsubscribe unsubscribes the connection from the given channels, or from all
// of them if none is given.
func (c PubSubConn) Unsubscribe(channel ...interface{}) error {
c.Conn.Send("UNSUBSCRIBE", channel...)
return c.Conn.Flush()
}
// PUnsubscribe unsubscribes the connection from the given patterns, or from all
// of them if none is given.
func (c PubSubConn) PUnsubscribe(channel ...interface{}) error {
c.Conn.Send("PUNSUBSCRIBE", channel...)
return c.Conn.Flush()
}
// Ping sends a PING to the server with the specified data.
func (c PubSubConn) Ping(data string) error {
c.Conn.Send("PING", data)
return c.Conn.Flush()
}
// Receive returns a pushed message as a Subscription, Message, PMessage, Pong
// or error. The return value is intended to be used directly in a type switch
// as illustrated in the PubSubConn example.
func (c PubSubConn) Receive() interface{} {
reply, err := Values(c.Conn.Receive())
if err != nil {
return err
}
var kind string
reply, err = Scan(reply, &kind)
if err != nil {
return err
}
switch kind {
case "message":
var m Message
if _, err := Scan(reply, &m.Channel, &m.Data); err != nil {
return err
}
return m
case "pmessage":
var pm PMessage
if _, err := Scan(reply, &pm.Pattern, &pm.Channel, &pm.Data); err != nil {
return err
}
return pm
case "subscribe", "psubscribe", "unsubscribe", "punsubscribe":
s := Subscription{Kind: kind}
if _, err := Scan(reply, &s.Channel, &s.Count); err != nil {
return err
}
return s
case "pong":
var p Pong
if _, err := Scan(reply, &p.Data); err != nil {
return err
}
return p
}
return pkgerr.WithStack(errPubSub)
}

@ -0,0 +1,51 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"context"
)
// Error represents an error returned in a command reply.
type Error string
func (err Error) Error() string { return string(err) }
// Conn represents a connection to a Redis server.
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value if the connection is broken. The returned
// value is either the first non-nil value returned from the underlying
// network connection or a protocol parsing error. Applications should
// close broken connections.
Err() error
// Do sends a command to the server and returns the received reply.
Do(commandName string, args ...interface{}) (reply interface{}, err error)
// Send writes the command to the client's output buffer.
Send(commandName string, args ...interface{}) error
// Flush flushes the output buffer to the Redis server.
Flush() error
// Receive receives a single reply from the Redis server
Receive() (reply interface{}, err error)
// WithContext
WithContext(ctx context.Context) Conn
}

@ -0,0 +1,409 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
"strconv"
pkgerr "github.com/pkg/errors"
)
// ErrNil indicates that a reply value is nil.
var ErrNil = errors.New("redigo: nil returned")
// Int is a helper that converts a command reply to an integer. If err is not
// equal to nil, then Int returns 0, err. Otherwise, Int converts the
// reply to an int as follows:
//
// Reply type Result
// integer int(reply), nil
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Int(reply interface{}, err error) (int, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
x := int(reply)
if int64(x) != reply {
return 0, pkgerr.WithStack(strconv.ErrRange)
}
return x, nil
case []byte:
n, err := strconv.ParseInt(string(reply), 10, 0)
return int(n), pkgerr.WithStack(err)
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, pkgerr.Errorf("redigo: unexpected type for Int, got type %T", reply)
}
// Int64 is a helper that converts a command reply to 64 bit integer. If err is
// not equal to nil, then Int returns 0, err. Otherwise, Int64 converts the
// reply to an int64 as follows:
//
// Reply type Result
// integer reply, nil
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Int64(reply interface{}, err error) (int64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
return reply, nil
case []byte:
n, err := strconv.ParseInt(string(reply), 10, 64)
return n, pkgerr.WithStack(err)
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, pkgerr.Errorf("redigo: unexpected type for Int64, got type %T", reply)
}
var errNegativeInt = errors.New("redigo: unexpected value for Uint64")
// Uint64 is a helper that converts a command reply to 64 bit integer. If err is
// not equal to nil, then Int returns 0, err. Otherwise, Int64 converts the
// reply to an int64 as follows:
//
// Reply type Result
// integer reply, nil
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Uint64(reply interface{}, err error) (uint64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case int64:
if reply < 0 {
return 0, pkgerr.WithStack(errNegativeInt)
}
return uint64(reply), nil
case []byte:
n, err := strconv.ParseUint(string(reply), 10, 64)
return n, err
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, pkgerr.Errorf("redigo: unexpected type for Uint64, got type %T", reply)
}
// Float64 is a helper that converts a command reply to 64 bit float. If err is
// not equal to nil, then Float64 returns 0, err. Otherwise, Float64 converts
// the reply to an int as follows:
//
// Reply type Result
// bulk string parsed reply, nil
// nil 0, ErrNil
// other 0, error
func Float64(reply interface{}, err error) (float64, error) {
if err != nil {
return 0, err
}
switch reply := reply.(type) {
case []byte:
n, err := strconv.ParseFloat(string(reply), 64)
return n, pkgerr.WithStack(err)
case nil:
return 0, ErrNil
case Error:
return 0, reply
}
return 0, pkgerr.Errorf("redigo: unexpected type for Float64, got type %T", reply)
}
// String is a helper that converts a command reply to a string. If err is not
// equal to nil, then String returns "", err. Otherwise String converts the
// reply to a string as follows:
//
// Reply type Result
// bulk string string(reply), nil
// simple string reply, nil
// nil "", ErrNil
// other "", error
func String(reply interface{}, err error) (string, error) {
if err != nil {
return "", err
}
switch reply := reply.(type) {
case []byte:
return string(reply), nil
case string:
return reply, nil
case nil:
return "", ErrNil
case Error:
return "", reply
}
return "", pkgerr.Errorf("redigo: unexpected type for String, got type %T", reply)
}
// Bytes is a helper that converts a command reply to a slice of bytes. If err
// is not equal to nil, then Bytes returns nil, err. Otherwise Bytes converts
// the reply to a slice of bytes as follows:
//
// Reply type Result
// bulk string reply, nil
// simple string []byte(reply), nil
// nil nil, ErrNil
// other nil, error
func Bytes(reply interface{}, err error) ([]byte, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []byte:
return reply, nil
case string:
return []byte(reply), nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, pkgerr.Errorf("redigo: unexpected type for Bytes, got type %T", reply)
}
// Bool is a helper that converts a command reply to a boolean. If err is not
// equal to nil, then Bool returns false, err. Otherwise Bool converts the
// reply to boolean as follows:
//
// Reply type Result
// integer value != 0, nil
// bulk string strconv.ParseBool(reply)
// nil false, ErrNil
// other false, error
func Bool(reply interface{}, err error) (bool, error) {
if err != nil {
return false, err
}
switch reply := reply.(type) {
case int64:
return reply != 0, nil
case []byte:
b, e := strconv.ParseBool(string(reply))
return b, pkgerr.WithStack(e)
case nil:
return false, ErrNil
case Error:
return false, reply
}
return false, pkgerr.Errorf("redigo: unexpected type for Bool, got type %T", reply)
}
// MultiBulk is a helper that converts an array command reply to a []interface{}.
//
// Deprecated: Use Values instead.
func MultiBulk(reply interface{}, err error) ([]interface{}, error) { return Values(reply, err) }
// Values is a helper that converts an array command reply to a []interface{}.
// If err is not equal to nil, then Values returns nil, err. Otherwise, Values
// converts the reply as follows:
//
// Reply type Result
// array reply, nil
// nil nil, ErrNil
// other nil, error
func Values(reply interface{}, err error) ([]interface{}, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
return reply, nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, pkgerr.Errorf("redigo: unexpected type for Values, got type %T", reply)
}
// Strings is a helper that converts an array command reply to a []string. If
// err is not equal to nil, then Strings returns nil, err. Nil array items are
// converted to "" in the output slice. Strings returns an error if an array
// item is not a bulk string or nil.
func Strings(reply interface{}, err error) ([]string, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
result := make([]string, len(reply))
for i := range reply {
if reply[i] == nil {
continue
}
p, ok := reply[i].([]byte)
if !ok {
return nil, pkgerr.Errorf("redigo: unexpected element type for Strings, got type %T", reply[i])
}
result[i] = string(p)
}
return result, nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, pkgerr.Errorf("redigo: unexpected type for Strings, got type %T", reply)
}
// ByteSlices is a helper that converts an array command reply to a [][]byte.
// If err is not equal to nil, then ByteSlices returns nil, err. Nil array
// items are stay nil. ByteSlices returns an error if an array item is not a
// bulk string or nil.
func ByteSlices(reply interface{}, err error) ([][]byte, error) {
if err != nil {
return nil, err
}
switch reply := reply.(type) {
case []interface{}:
result := make([][]byte, len(reply))
for i := range reply {
if reply[i] == nil {
continue
}
p, ok := reply[i].([]byte)
if !ok {
return nil, pkgerr.Errorf("redigo: unexpected element type for ByteSlices, got type %T", reply[i])
}
result[i] = p
}
return result, nil
case nil:
return nil, ErrNil
case Error:
return nil, reply
}
return nil, pkgerr.Errorf("redigo: unexpected type for ByteSlices, got type %T", reply)
}
// Ints is a helper that converts an array command reply to a []int. If
// err is not equal to nil, then Ints returns nil, err.
func Ints(reply interface{}, err error) ([]int, error) {
var ints []int
values, err := Values(reply, err)
if err != nil {
return ints, err
}
if err := ScanSlice(values, &ints); err != nil {
return ints, err
}
return ints, nil
}
// Int64s is a helper that converts an array command reply to a []int64. If
// err is not equal to nil, then Int64s returns nil, err.
func Int64s(reply interface{}, err error) ([]int64, error) {
var int64s []int64
values, err := Values(reply, err)
if err != nil {
return int64s, err
}
if err := ScanSlice(values, &int64s); err != nil {
return int64s, err
}
return int64s, nil
}
// StringMap is a helper that converts an array of strings (alternating key, value)
// into a map[string]string. The HGETALL and CONFIG GET commands return replies in this format.
// Requires an even number of values in result.
func StringMap(result interface{}, err error) (map[string]string, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
if len(values)%2 != 0 {
return nil, pkgerr.New("redigo: StringMap expects even number of values result")
}
m := make(map[string]string, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, okKey := values[i].([]byte)
value, okValue := values[i+1].([]byte)
if !okKey || !okValue {
return nil, pkgerr.New("redigo: ScanMap key not a bulk string value")
}
m[string(key)] = string(value)
}
return m, nil
}
// IntMap is a helper that converts an array of strings (alternating key, value)
// into a map[string]int. The HGETALL commands return replies in this format.
// Requires an even number of values in result.
func IntMap(result interface{}, err error) (map[string]int, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
if len(values)%2 != 0 {
return nil, pkgerr.New("redigo: IntMap expects even number of values result")
}
m := make(map[string]int, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, ok := values[i].([]byte)
if !ok {
return nil, pkgerr.New("redigo: ScanMap key not a bulk string value")
}
value, err := Int(values[i+1], nil)
if err != nil {
return nil, err
}
m[string(key)] = value
}
return m, nil
}
// Int64Map is a helper that converts an array of strings (alternating key, value)
// into a map[string]int64. The HGETALL commands return replies in this format.
// Requires an even number of values in result.
func Int64Map(result interface{}, err error) (map[string]int64, error) {
values, err := Values(result, err)
if err != nil {
return nil, err
}
if len(values)%2 != 0 {
return nil, pkgerr.New("redigo: Int64Map expects even number of values result")
}
m := make(map[string]int64, len(values)/2)
for i := 0; i < len(values); i += 2 {
key, ok := values[i].([]byte)
if !ok {
return nil, pkgerr.New("redigo: ScanMap key not a bulk string value")
}
value, err := Int64(values[i+1], nil)
if err != nil {
return nil, err
}
m[string(key)] = value
}
return m, nil
}

@ -0,0 +1,559 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"errors"
"fmt"
"reflect"
"strconv"
"strings"
"sync"
pkgerr "github.com/pkg/errors"
)
func ensureLen(d reflect.Value, n int) {
if n > d.Cap() {
d.Set(reflect.MakeSlice(d.Type(), n, n))
} else {
d.SetLen(n)
}
}
func cannotConvert(d reflect.Value, s interface{}) error {
var sname string
switch s.(type) {
case string:
sname = "Redis simple string"
case Error:
sname = "Redis error"
case int64:
sname = "Redis integer"
case []byte:
sname = "Redis bulk string"
case []interface{}:
sname = "Redis array"
default:
sname = reflect.TypeOf(s).String()
}
return pkgerr.Errorf("cannot convert from %s to %s", sname, d.Type())
}
func convertAssignBulkString(d reflect.Value, s []byte) (err error) {
switch d.Type().Kind() {
case reflect.Float32, reflect.Float64:
var x float64
x, err = strconv.ParseFloat(string(s), d.Type().Bits())
d.SetFloat(x)
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
var x int64
x, err = strconv.ParseInt(string(s), 10, d.Type().Bits())
d.SetInt(x)
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
var x uint64
x, err = strconv.ParseUint(string(s), 10, d.Type().Bits())
d.SetUint(x)
case reflect.Bool:
var x bool
x, err = strconv.ParseBool(string(s))
d.SetBool(x)
case reflect.String:
d.SetString(string(s))
case reflect.Slice:
if d.Type().Elem().Kind() != reflect.Uint8 {
err = cannotConvert(d, s)
} else {
d.SetBytes(s)
}
default:
err = cannotConvert(d, s)
}
err = pkgerr.WithStack(err)
return
}
func convertAssignInt(d reflect.Value, s int64) (err error) {
switch d.Type().Kind() {
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
d.SetInt(s)
if d.Int() != s {
err = strconv.ErrRange
d.SetInt(0)
}
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64:
if s < 0 {
err = strconv.ErrRange
} else {
x := uint64(s)
d.SetUint(x)
if d.Uint() != x {
err = strconv.ErrRange
d.SetUint(0)
}
}
case reflect.Bool:
d.SetBool(s != 0)
default:
err = cannotConvert(d, s)
}
err = pkgerr.WithStack(err)
return
}
func convertAssignValue(d reflect.Value, s interface{}) (err error) {
switch s := s.(type) {
case []byte:
err = convertAssignBulkString(d, s)
case int64:
err = convertAssignInt(d, s)
default:
err = cannotConvert(d, s)
}
return err
}
func convertAssignArray(d reflect.Value, s []interface{}) error {
if d.Type().Kind() != reflect.Slice {
return cannotConvert(d, s)
}
ensureLen(d, len(s))
for i := 0; i < len(s); i++ {
if err := convertAssignValue(d.Index(i), s[i]); err != nil {
return err
}
}
return nil
}
func convertAssign(d interface{}, s interface{}) (err error) {
// Handle the most common destination types using type switches and
// fall back to reflection for all other types.
switch s := s.(type) {
case nil:
// ingore
case []byte:
switch d := d.(type) {
case *string:
*d = string(s)
case *int:
*d, err = strconv.Atoi(string(s))
case *bool:
*d, err = strconv.ParseBool(string(s))
case *[]byte:
*d = s
case *interface{}:
*d = s
case nil:
// skip value
default:
if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr {
err = cannotConvert(d, s)
} else {
err = convertAssignBulkString(d.Elem(), s)
}
}
case int64:
switch d := d.(type) {
case *int:
x := int(s)
if int64(x) != s {
err = strconv.ErrRange
x = 0
}
*d = x
case *bool:
*d = s != 0
case *interface{}:
*d = s
case nil:
// skip value
default:
if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr {
err = cannotConvert(d, s)
} else {
err = convertAssignInt(d.Elem(), s)
}
}
case string:
switch d := d.(type) {
case *string:
*d = string(s)
default:
err = cannotConvert(reflect.ValueOf(d), s)
}
case []interface{}:
switch d := d.(type) {
case *[]interface{}:
*d = s
case *interface{}:
*d = s
case nil:
// skip value
default:
if d := reflect.ValueOf(d); d.Type().Kind() != reflect.Ptr {
err = cannotConvert(d, s)
} else {
err = convertAssignArray(d.Elem(), s)
}
}
case Error:
err = s
default:
err = cannotConvert(reflect.ValueOf(d), s)
}
err = pkgerr.WithStack(err)
return
}
// Scan copies from src to the values pointed at by dest.
//
// The values pointed at by dest must be an integer, float, boolean, string,
// []byte, interface{} or slices of these types. Scan uses the standard strconv
// package to convert bulk strings to numeric and boolean types.
//
// If a dest value is nil, then the corresponding src value is skipped.
//
// If a src element is nil, then the corresponding dest value is not modified.
//
// To enable easy use of Scan in a loop, Scan returns the slice of src
// following the copied values.
func Scan(src []interface{}, dest ...interface{}) ([]interface{}, error) {
if len(src) < len(dest) {
return nil, pkgerr.New("redigo.Scan: array short")
}
var err error
for i, d := range dest {
err = convertAssign(d, src[i])
if err != nil {
err = fmt.Errorf("redigo.Scan: cannot assign to dest %d: %v", i, err)
break
}
}
return src[len(dest):], err
}
type fieldSpec struct {
name string
index []int
omitEmpty bool
}
type structSpec struct {
m map[string]*fieldSpec
l []*fieldSpec
}
func (ss *structSpec) fieldSpec(name []byte) *fieldSpec {
return ss.m[string(name)]
}
func compileStructSpec(t reflect.Type, depth map[string]int, index []int, ss *structSpec) {
for i := 0; i < t.NumField(); i++ {
f := t.Field(i)
switch {
case f.PkgPath != "" && !f.Anonymous:
// Ignore unexported fields.
case f.Anonymous:
// TODO: Handle pointers. Requires change to decoder and
// protection against infinite recursion.
if f.Type.Kind() == reflect.Struct {
compileStructSpec(f.Type, depth, append(index, i), ss)
}
default:
fs := &fieldSpec{name: f.Name}
tag := f.Tag.Get("redis")
p := strings.Split(tag, ",")
if len(p) > 0 {
if p[0] == "-" {
continue
}
if len(p[0]) > 0 {
fs.name = p[0]
}
for _, s := range p[1:] {
switch s {
case "omitempty":
fs.omitEmpty = true
default:
panic(fmt.Errorf("redigo: unknown field tag %s for type %s", s, t.Name()))
}
}
}
d, found := depth[fs.name]
if !found {
d = 1 << 30
}
switch {
case len(index) == d:
// At same depth, remove from result.
delete(ss.m, fs.name)
j := 0
for i1 := 0; i1 < len(ss.l); i1++ {
if fs.name != ss.l[i1].name {
ss.l[j] = ss.l[i1]
j++
}
}
ss.l = ss.l[:j]
case len(index) < d:
fs.index = make([]int, len(index)+1)
copy(fs.index, index)
fs.index[len(index)] = i
depth[fs.name] = len(index)
ss.m[fs.name] = fs
ss.l = append(ss.l, fs)
}
}
}
}
var (
structSpecMutex sync.RWMutex
structSpecCache = make(map[reflect.Type]*structSpec)
)
func structSpecForType(t reflect.Type) *structSpec {
structSpecMutex.RLock()
ss, found := structSpecCache[t]
structSpecMutex.RUnlock()
if found {
return ss
}
structSpecMutex.Lock()
defer structSpecMutex.Unlock()
ss, found = structSpecCache[t]
if found {
return ss
}
ss = &structSpec{m: make(map[string]*fieldSpec)}
compileStructSpec(t, make(map[string]int), nil, ss)
structSpecCache[t] = ss
return ss
}
var errScanStructValue = errors.New("redigo.ScanStruct: value must be non-nil pointer to a struct")
// ScanStruct scans alternating names and values from src to a struct. The
// HGETALL and CONFIG GET commands return replies in this format.
//
// ScanStruct uses exported field names to match values in the response. Use
// 'redis' field tag to override the name:
//
// Field int `redis:"myName"`
//
// Fields with the tag redis:"-" are ignored.
//
// Integer, float, boolean, string and []byte fields are supported. Scan uses the
// standard strconv package to convert bulk string values to numeric and
// boolean types.
//
// If a src element is nil, then the corresponding field is not modified.
func ScanStruct(src []interface{}, dest interface{}) error {
d := reflect.ValueOf(dest)
if d.Kind() != reflect.Ptr || d.IsNil() {
return pkgerr.WithStack(errScanStructValue)
}
d = d.Elem()
if d.Kind() != reflect.Struct {
return pkgerr.WithStack(errScanStructValue)
}
ss := structSpecForType(d.Type())
if len(src)%2 != 0 {
return pkgerr.New("redigo.ScanStruct: number of values not a multiple of 2")
}
for i := 0; i < len(src); i += 2 {
s := src[i+1]
if s == nil {
continue
}
name, ok := src[i].([]byte)
if !ok {
return pkgerr.Errorf("redigo.ScanStruct: key %d not a bulk string value", i)
}
fs := ss.fieldSpec(name)
if fs == nil {
continue
}
if err := convertAssignValue(d.FieldByIndex(fs.index), s); err != nil {
return pkgerr.Errorf("redigo.ScanStruct: cannot assign field %s: %v", fs.name, err)
}
}
return nil
}
var (
errScanSliceValue = errors.New("redigo.ScanSlice: dest must be non-nil pointer to a struct")
)
// ScanSlice scans src to the slice pointed to by dest. The elements the dest
// slice must be integer, float, boolean, string, struct or pointer to struct
// values.
//
// Struct fields must be integer, float, boolean or string values. All struct
// fields are used unless a subset is specified using fieldNames.
func ScanSlice(src []interface{}, dest interface{}, fieldNames ...string) error {
d := reflect.ValueOf(dest)
if d.Kind() != reflect.Ptr || d.IsNil() {
return pkgerr.WithStack(errScanSliceValue)
}
d = d.Elem()
if d.Kind() != reflect.Slice {
return pkgerr.WithStack(errScanSliceValue)
}
isPtr := false
t := d.Type().Elem()
if t.Kind() == reflect.Ptr && t.Elem().Kind() == reflect.Struct {
isPtr = true
t = t.Elem()
}
if t.Kind() != reflect.Struct {
ensureLen(d, len(src))
for i, s := range src {
if s == nil {
continue
}
if err := convertAssignValue(d.Index(i), s); err != nil {
return pkgerr.Errorf("redigo.ScanSlice: cannot assign element %d: %v", i, err)
}
}
return nil
}
ss := structSpecForType(t)
fss := ss.l
if len(fieldNames) > 0 {
fss = make([]*fieldSpec, len(fieldNames))
for i, name := range fieldNames {
fss[i] = ss.m[name]
if fss[i] == nil {
return pkgerr.Errorf("redigo.ScanSlice: ScanSlice bad field name %s", name)
}
}
}
if len(fss) == 0 {
return pkgerr.New("redigo.ScanSlice: no struct fields")
}
n := len(src) / len(fss)
if n*len(fss) != len(src) {
return pkgerr.New("redigo.ScanSlice: length not a multiple of struct field count")
}
ensureLen(d, n)
for i := 0; i < n; i++ {
d1 := d.Index(i)
if isPtr {
if d1.IsNil() {
d1.Set(reflect.New(t))
}
d1 = d1.Elem()
}
for j, fs := range fss {
s := src[i*len(fss)+j]
if s == nil {
continue
}
if err := convertAssignValue(d1.FieldByIndex(fs.index), s); err != nil {
return pkgerr.Errorf("redigo.ScanSlice: cannot assign element %d to field %s: %v", i*len(fss)+j, fs.name, err)
}
}
}
return nil
}
// Args is a helper for constructing command arguments from structured values.
type Args []interface{}
// Add returns the result of appending value to args.
func (args Args) Add(value ...interface{}) Args {
return append(args, value...)
}
// AddFlat returns the result of appending the flattened value of v to args.
//
// Maps are flattened by appending the alternating keys and map values to args.
//
// Slices are flattened by appending the slice elements to args.
//
// Structs are flattened by appending the alternating names and values of
// exported fields to args. If v is a nil struct pointer, then nothing is
// appended. The 'redis' field tag overrides struct field names. See ScanStruct
// for more information on the use of the 'redis' field tag.
//
// Other types are appended to args as is.
func (args Args) AddFlat(v interface{}) Args {
rv := reflect.ValueOf(v)
switch rv.Kind() {
case reflect.Struct:
args = flattenStruct(args, rv)
case reflect.Slice:
for i := 0; i < rv.Len(); i++ {
args = append(args, rv.Index(i).Interface())
}
case reflect.Map:
for _, k := range rv.MapKeys() {
args = append(args, k.Interface(), rv.MapIndex(k).Interface())
}
case reflect.Ptr:
if rv.Type().Elem().Kind() == reflect.Struct {
if !rv.IsNil() {
args = flattenStruct(args, rv.Elem())
}
} else {
args = append(args, v)
}
default:
args = append(args, v)
}
return args
}
func flattenStruct(args Args, v reflect.Value) Args {
ss := structSpecForType(v.Type())
for _, fs := range ss.l {
fv := v.FieldByIndex(fs.index)
if fs.omitEmpty {
var empty = false
switch fv.Kind() {
case reflect.Array, reflect.Map, reflect.Slice, reflect.String:
empty = fv.Len() == 0
case reflect.Bool:
empty = !fv.Bool()
case reflect.Int, reflect.Int8, reflect.Int16, reflect.Int32, reflect.Int64:
empty = fv.Int() == 0
case reflect.Uint, reflect.Uint8, reflect.Uint16, reflect.Uint32, reflect.Uint64, reflect.Uintptr:
empty = fv.Uint() == 0
case reflect.Float32, reflect.Float64:
empty = fv.Float() == 0
case reflect.Interface, reflect.Ptr:
empty = fv.IsNil()
}
if empty {
continue
}
}
args = append(args, fs.name, fv.Interface())
}
return args
}

@ -0,0 +1,86 @@
// Copyright 2012 Gary Burd
//
// Licensed under the Apache License, Version 2.0 (the "License"): you may
// not use this file except in compliance with the License. You may obtain
// a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
// License for the specific language governing permissions and limitations
// under the License.
package redis
import (
"crypto/sha1"
"encoding/hex"
"io"
"strings"
)
// Script encapsulates the source, hash and key count for a Lua script. See
// http://redis.io/commands/eval for information on scripts in Redis.
type Script struct {
keyCount int
src string
hash string
}
// NewScript returns a new script object. If keyCount is greater than or equal
// to zero, then the count is automatically inserted in the EVAL command
// argument list. If keyCount is less than zero, then the application supplies
// the count as the first value in the keysAndArgs argument to the Do, Send and
// SendHash methods.
func NewScript(keyCount int, src string) *Script {
h := sha1.New()
io.WriteString(h, src)
return &Script{keyCount, src, hex.EncodeToString(h.Sum(nil))}
}
func (s *Script) args(spec string, keysAndArgs []interface{}) []interface{} {
var args []interface{}
if s.keyCount < 0 {
args = make([]interface{}, 1+len(keysAndArgs))
args[0] = spec
copy(args[1:], keysAndArgs)
} else {
args = make([]interface{}, 2+len(keysAndArgs))
args[0] = spec
args[1] = s.keyCount
copy(args[2:], keysAndArgs)
}
return args
}
// Do evaluates the script. Under the covers, Do optimistically evaluates the
// script using the EVALSHA command. If the command fails because the script is
// not loaded, then Do evaluates the script using the EVAL command (thus
// causing the script to load).
func (s *Script) Do(c Conn, keysAndArgs ...interface{}) (interface{}, error) {
v, err := c.Do("EVALSHA", s.args(s.hash, keysAndArgs)...)
if e, ok := err.(Error); ok && strings.HasPrefix(string(e), "NOSCRIPT ") {
v, err = c.Do("EVAL", s.args(s.src, keysAndArgs)...)
}
return v, err
}
// SendHash evaluates the script without waiting for the reply. The script is
// evaluated with the EVALSHA command. The application must ensure that the
// script is loaded by a previous call to Send, Do or Load methods.
func (s *Script) SendHash(c Conn, keysAndArgs ...interface{}) error {
return c.Send("EVALSHA", s.args(s.hash, keysAndArgs)...)
}
// Send evaluates the script without waiting for the reply.
func (s *Script) Send(c Conn, keysAndArgs ...interface{}) error {
return c.Send("EVAL", s.args(s.src, keysAndArgs)...)
}
// Load loads the script without evaluating it.
func (s *Script) Load(c Conn) error {
_, err := c.Do("SCRIPT", "LOAD", s.src)
return err
}

@ -0,0 +1,142 @@
package redis
import (
"context"
"fmt"
"time"
"github.com/bilibili/Kratos/pkg/log"
"github.com/bilibili/Kratos/pkg/net/trace"
)
const (
_traceComponentName = "library/cache/redis"
_tracePeerService = "redis"
_traceSpanKind = "client"
_slowLogDuration = time.Millisecond * 250
)
var _internalTags = []trace.Tag{
trace.TagString(trace.TagSpanKind, _traceSpanKind),
trace.TagString(trace.TagComponent, _traceComponentName),
trace.TagString(trace.TagPeerService, _tracePeerService),
}
type traceConn struct {
// tr for pipeline, if tr != nil meaning on pipeline
tr trace.Trace
ctx context.Context
// connTag include e.g. ip,port
connTags []trace.Tag
// origin redis conn
Conn
pending int
}
func (t *traceConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
statement := getStatement(commandName, args...)
defer slowLog(statement, time.Now())
root, ok := trace.FromContext(t.ctx)
// NOTE: ignored empty commandName
// current sdk will Do empty command after pipeline finished
if !ok || commandName == "" {
return t.Conn.Do(commandName, args...)
}
tr := root.Fork("", "Redis:"+commandName)
tr.SetTag(_internalTags...)
tr.SetTag(t.connTags...)
tr.SetTag(trace.TagString(trace.TagDBStatement, statement))
reply, err = t.Conn.Do(commandName, args...)
tr.Finish(&err)
return
}
func (t *traceConn) Send(commandName string, args ...interface{}) error {
statement := getStatement(commandName, args...)
defer slowLog(statement, time.Now())
t.pending++
root, ok := trace.FromContext(t.ctx)
if !ok {
return t.Conn.Send(commandName, args...)
}
if t.tr == nil {
t.tr = root.Fork("", "Redis:Pipeline")
t.tr.SetTag(_internalTags...)
t.tr.SetTag(t.connTags...)
}
t.tr.SetLog(
trace.Log(trace.LogEvent, "Send"),
trace.Log("db.statement", statement),
)
err := t.Conn.Send(commandName, args...)
if err != nil {
t.tr.SetTag(trace.TagBool(trace.TagError, true))
t.tr.SetLog(
trace.Log(trace.LogEvent, "Send Fail"),
trace.Log(trace.LogMessage, err.Error()),
)
}
return err
}
func (t *traceConn) Flush() error {
defer slowLog("Flush", time.Now())
if t.tr == nil {
return t.Conn.Flush()
}
t.tr.SetLog(trace.Log(trace.LogEvent, "Flush"))
err := t.Conn.Flush()
if err != nil {
t.tr.SetTag(trace.TagBool(trace.TagError, true))
t.tr.SetLog(
trace.Log(trace.LogEvent, "Flush Fail"),
trace.Log(trace.LogMessage, err.Error()),
)
}
return err
}
func (t *traceConn) Receive() (reply interface{}, err error) {
defer slowLog("Receive", time.Now())
if t.tr == nil {
return t.Conn.Receive()
}
t.tr.SetLog(trace.Log(trace.LogEvent, "Receive"))
reply, err = t.Conn.Receive()
if err != nil {
t.tr.SetTag(trace.TagBool(trace.TagError, true))
t.tr.SetLog(
trace.Log(trace.LogEvent, "Receive Fail"),
trace.Log(trace.LogMessage, err.Error()),
)
}
if t.pending > 0 {
t.pending--
}
if t.pending == 0 {
t.tr.Finish(nil)
t.tr = nil
}
return reply, err
}
func (t *traceConn) WithContext(ctx context.Context) Conn {
t.ctx = ctx
return t
}
func slowLog(statement string, now time.Time) {
du := time.Since(now)
if du > _slowLogDuration {
log.Warn("%s slow log statement: %s time: %v", _tracePeerService, statement, du)
}
}
func getStatement(commandName string, args ...interface{}) (res string) {
res = commandName
if len(args) > 0 {
res = fmt.Sprintf("%s %v", commandName, args[0])
}
return
}
Loading…
Cancel
Save