You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
kratos/pkg/cache/memcache/memcache.go

377 lines
12 KiB

package memcache
import (
"context"
"github.com/bilibili/kratos/pkg/container/pool"
xtime "github.com/bilibili/kratos/pkg/time"
)
const (
// Flag, 15(encoding) bit+ 17(compress) bit
// FlagRAW default flag.
FlagRAW = uint32(0)
// FlagGOB gob encoding.
FlagGOB = uint32(1) << 0
// FlagJSON json encoding.
FlagJSON = uint32(1) << 1
// FlagProtobuf protobuf
FlagProtobuf = uint32(1) << 2
_flagEncoding = uint32(0xFFFF8000)
// FlagGzip gzip compress.
FlagGzip = uint32(1) << 15
// left mv 31??? not work!!!
flagLargeValue = uint32(1) << 30
)
// Item is an reply to be got or stored in a memcached server.
type Item struct {
// Key is the Item's key (250 bytes maximum).
Key string
// Value is the Item's value.
Value []byte
// Object is the Item's object for use codec.
Object interface{}
// Flags are server-opaque flags whose semantics are entirely
// up to the app.
Flags uint32
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
Expiration int32
// Compare and swap ID.
cas uint64
}
// Conn represents a connection to a Memcache server.
// Command Reference: https://github.com/memcached/memcached/wiki/Commands
type Conn interface {
// Close closes the connection.
Close() error
// Err returns a non-nil value if the connection is broken. The returned
// value is either the first non-nil value returned from the underlying
// network connection or a protocol parsing error. Applications should
// close broken connections.
Err() error
// Add writes the given item, if no value already exists for its key.
// ErrNotStored is returned if that condition is not met.
Add(item *Item) error
// Set writes the given item, unconditionally.
Set(item *Item) error
// Replace writes the given item, but only if the server *does* already
// hold data for this key.
Replace(item *Item) error
// Get sends a command to the server for gets data.
Get(key string) (*Item, error)
// GetMulti is a batch version of Get. The returned map from keys to items
// may have fewer elements than the input slice, due to memcache cache
// misses. Each key must be at most 250 bytes in length.
// If no error is returned, the returned map will also be non-nil.
GetMulti(keys []string) (map[string]*Item, error)
// Delete deletes the item with the provided key.
// The error ErrNotFound is returned if the item didn't already exist in
// the cache.
Delete(key string) error
// Increment atomically increments key by delta. The return value is the
// new value after being incremented or an error. If the value didn't exist
// in memcached the error is ErrNotFound. The value in memcached must be
// an decimal number, or an error will be returned.
// On 64-bit overflow, the new value wraps around.
Increment(key string, delta uint64) (newValue uint64, err error)
// Decrement atomically decrements key by delta. The return value is the
// new value after being decremented or an error. If the value didn't exist
// in memcached the error is ErrNotFound. The value in memcached must be
// an decimal number, or an error will be returned. On underflow, the new
// value is capped at zero and does not wrap around.
Decrement(key string, delta uint64) (newValue uint64, err error)
// CompareAndSwap writes the given item that was previously returned by
// Get, if the value was neither modified or evicted between the Get and
// the CompareAndSwap calls. The item's Key should not change between calls
// but all other item fields may differ. ErrCASConflict is returned if the
// value was modified in between the calls.
// ErrNotStored is returned if the value was evicted in between the calls.
CompareAndSwap(item *Item) error
// Touch updates the expiry for the given key. The seconds parameter is
// either a Unix timestamp or, if seconds is less than 1 month, the number
// of seconds into the future at which time the item will expire.
// ErrNotFound is returned if the key is not in the cache. The key must be
// at most 250 bytes in length.
Touch(key string, seconds int32) (err error)
// Scan converts value read from the memcache into the following
// common Go types and special types:
//
// *string
// *[]byte
// *interface{}
//
Scan(item *Item, v interface{}) (err error)
// Add writes the given item, if no value already exists for its key.
// ErrNotStored is returned if that condition is not met.
AddContext(ctx context.Context, item *Item) error
// Set writes the given item, unconditionally.
SetContext(ctx context.Context, item *Item) error
// Replace writes the given item, but only if the server *does* already
// hold data for this key.
ReplaceContext(ctx context.Context, item *Item) error
// Get sends a command to the server for gets data.
GetContext(ctx context.Context, key string) (*Item, error)
// GetMulti is a batch version of Get. The returned map from keys to items
// may have fewer elements than the input slice, due to memcache cache
// misses. Each key must be at most 250 bytes in length.
// If no error is returned, the returned map will also be non-nil.
GetMultiContext(ctx context.Context, keys []string) (map[string]*Item, error)
// Delete deletes the item with the provided key.
// The error ErrNotFound is returned if the item didn't already exist in
// the cache.
DeleteContext(ctx context.Context, key string) error
// Increment atomically increments key by delta. The return value is the
// new value after being incremented or an error. If the value didn't exist
// in memcached the error is ErrNotFound. The value in memcached must be
// an decimal number, or an error will be returned.
// On 64-bit overflow, the new value wraps around.
IncrementContext(ctx context.Context, key string, delta uint64) (newValue uint64, err error)
// Decrement atomically decrements key by delta. The return value is the
// new value after being decremented or an error. If the value didn't exist
// in memcached the error is ErrNotFound. The value in memcached must be
// an decimal number, or an error will be returned. On underflow, the new
// value is capped at zero and does not wrap around.
DecrementContext(ctx context.Context, key string, delta uint64) (newValue uint64, err error)
// CompareAndSwap writes the given item that was previously returned by
// Get, if the value was neither modified or evicted between the Get and
// the CompareAndSwap calls. The item's Key should not change between calls
// but all other item fields may differ. ErrCASConflict is returned if the
// value was modified in between the calls.
// ErrNotStored is returned if the value was evicted in between the calls.
CompareAndSwapContext(ctx context.Context, item *Item) error
// Touch updates the expiry for the given key. The seconds parameter is
// either a Unix timestamp or, if seconds is less than 1 month, the number
// of seconds into the future at which time the item will expire.
// ErrNotFound is returned if the key is not in the cache. The key must be
// at most 250 bytes in length.
TouchContext(ctx context.Context, key string, seconds int32) (err error)
}
// Config memcache config.
type Config struct {
*pool.Config
Name string // memcache name, for trace
Proto string
Addr string
DialTimeout xtime.Duration
ReadTimeout xtime.Duration
WriteTimeout xtime.Duration
}
// Memcache memcache client
type Memcache struct {
pool *Pool
}
// Reply is the result of Get
type Reply struct {
err error
item *Item
conn Conn
closed bool
}
// Replies is the result of GetMulti
type Replies struct {
err error
items map[string]*Item
usedItems map[string]struct{}
conn Conn
closed bool
}
// New get a memcache client
func New(cfg *Config) *Memcache {
return &Memcache{pool: NewPool(cfg)}
}
// Close close connection pool
func (mc *Memcache) Close() error {
return mc.pool.Close()
}
// Conn direct get a connection
func (mc *Memcache) Conn(ctx context.Context) Conn {
return mc.pool.Get(ctx)
}
// Set writes the given item, unconditionally.
func (mc *Memcache) Set(ctx context.Context, item *Item) (err error) {
conn := mc.pool.Get(ctx)
err = conn.SetContext(ctx, item)
conn.Close()
return
}
// Add writes the given item, if no value already exists for its key.
// ErrNotStored is returned if that condition is not met.
func (mc *Memcache) Add(ctx context.Context, item *Item) (err error) {
conn := mc.pool.Get(ctx)
err = conn.AddContext(ctx, item)
conn.Close()
return
}
// Replace writes the given item, but only if the server *does* already hold data for this key.
func (mc *Memcache) Replace(ctx context.Context, item *Item) (err error) {
conn := mc.pool.Get(ctx)
err = conn.ReplaceContext(ctx, item)
conn.Close()
return
}
// CompareAndSwap writes the given item that was previously returned by Get
func (mc *Memcache) CompareAndSwap(ctx context.Context, item *Item) (err error) {
conn := mc.pool.Get(ctx)
err = conn.CompareAndSwapContext(ctx, item)
conn.Close()
return
}
// Get sends a command to the server for gets data.
func (mc *Memcache) Get(ctx context.Context, key string) *Reply {
conn := mc.pool.Get(ctx)
item, err := conn.GetContext(ctx, key)
if err != nil {
conn.Close()
}
return &Reply{err: err, item: item, conn: conn}
}
// Item get raw Item
func (r *Reply) Item() *Item {
return r.item
}
// Scan converts value, read from the memcache
func (r *Reply) Scan(v interface{}) (err error) {
if r.err != nil {
return r.err
}
err = r.conn.Scan(r.item, v)
if !r.closed {
r.conn.Close()
r.closed = true
}
return
}
// GetMulti is a batch version of Get
func (mc *Memcache) GetMulti(ctx context.Context, keys []string) (*Replies, error) {
conn := mc.pool.Get(ctx)
items, err := conn.GetMultiContext(ctx, keys)
rs := &Replies{err: err, items: items, conn: conn, usedItems: make(map[string]struct{}, len(keys))}
if (err != nil) || (len(items) == 0) {
rs.Close()
}
return rs, err
}
// Close close rows.
func (rs *Replies) Close() (err error) {
if !rs.closed {
err = rs.conn.Close()
rs.closed = true
}
return
}
// Item get Item from rows
func (rs *Replies) Item(key string) *Item {
return rs.items[key]
}
// Scan converts value, read from key in rows
func (rs *Replies) Scan(key string, v interface{}) (err error) {
if rs.err != nil {
return rs.err
}
item, ok := rs.items[key]
if !ok {
rs.Close()
return ErrNotFound
}
rs.usedItems[key] = struct{}{}
err = rs.conn.Scan(item, v)
if (err != nil) || (len(rs.items) == len(rs.usedItems)) {
rs.Close()
}
return
}
// Keys keys of result
func (rs *Replies) Keys() (keys []string) {
keys = make([]string, 0, len(rs.items))
for key := range rs.items {
keys = append(keys, key)
}
return
}
// Touch updates the expiry for the given key.
func (mc *Memcache) Touch(ctx context.Context, key string, timeout int32) (err error) {
conn := mc.pool.Get(ctx)
err = conn.TouchContext(ctx, key, timeout)
conn.Close()
return
}
// Delete deletes the item with the provided key.
func (mc *Memcache) Delete(ctx context.Context, key string) (err error) {
conn := mc.pool.Get(ctx)
err = conn.DeleteContext(ctx, key)
conn.Close()
return
}
// Increment atomically increments key by delta.
func (mc *Memcache) Increment(ctx context.Context, key string, delta uint64) (newValue uint64, err error) {
conn := mc.pool.Get(ctx)
newValue, err = conn.IncrementContext(ctx, key, delta)
conn.Close()
return
}
// Decrement atomically decrements key by delta.
func (mc *Memcache) Decrement(ctx context.Context, key string, delta uint64) (newValue uint64, err error) {
conn := mc.pool.Get(ctx)
newValue, err = conn.DecrementContext(ctx, key, delta)
conn.Close()
return
}