原创不易,求分享、求一键三连
缓存一般是用来加速数据访问的效率,在获取数据耗时高的场景下使用缓存可以有效的提高数据获取的效率。
比如,先从memcached中获取数据,如果没有则查询mysql中的数据得到结果写入到memcached中然后返回,下次请求就能够从memcached中获取数据直接返回。
在行业中使用比较多的缓存数据库有Redis和Memcached。
今天用go实现Memcached的驱动程序,深入了解Memcached和咱们平时所写的业务代码如何进行数据交互的协议和原理。
什么是memcached
Memcached是LiveJournal旗下Danga Interactive公司的Brad Fitzpatric为首开发的一款自由开源、高性能的key-value缓存数据库软件。
Mecached协议
Mecached服务和应用程序是不同机器不同的进程,双方进行数据交互通讯涉及到tcp和通讯协议,在memcache中协议有两种类型一种是文本行方式,另一种是非结构化数据方式。
我们挑选文本行协议来实现,大多数Mecached的客户端也是采用文本行协议来开发的因为比较简单,特定格式文本字符串来约定数据交互,如下以客户端发送命令举例:
<command name> <key> <flags> <exptime> <bytes>rn
<data block>rn
<command name>
是协议的命令,大致分为三类:
-
存储命令:set、 add、replace、append、prepend、cas。 -
获取命令:get、gets。 -
其他命令:version、stats
<key>
要求存储数据的关键字;由于memached底层实现的限制,key的长度限制在250个字符内,并且key中不能包含控制字符或空格。
<flags>
是一个16位无符号整数。
<exptime>
是存储超时时间。如果值为0表示该数据项永不超时;
过期时间实现限制,过期时间要么是Unix时间(从1970-1-1开始计算的秒数),要么是从当前时间开始计算的秒数。
该值不能超过30天,否则服务器将该参数当做真正的Unix时间而不是当前时间的一个偏移值。
<bytes>
是随后数据的字节数,不包括终结符rn。<bytes>
有可能是0,它后面将是一个空的数据块。
<data block>
存储数据流。
客户端以字符串的方式向服务端发送文本行的内容服务器端会返回对应的执行结果数据,也有返回错误的情况,memcache也对错误的数据格式定义三种不同错误类型的三种格式让错误的返回简单:
-
ERRORrn
说明客户端发送了一个不存在命令
-
CLIENT_ERROR rn
说明在输入行中存在某种类型的客户端错误,例如输入的信息没有遵循memcached的协议
-
SERVER_ERROR rn
说明服务端存在某种类型的错误导致致命命令无法执行。
<error>
是具有可读性的错误字符串。
当服务端错误发生后,将会导致服务器将不会再提供服务,服务器在发送该错误信息行后将关闭链接。只有在此场景下,服务器才会关闭与客户端链接。
以下就具体罗列memcached常用包含客户端发送和响应的命令格式列表:
需要注意的是command是区分大小写的,客户端使用tcp连接服务端发送客户端文本行命令,发送成功后等待服务器返回数据,根据格式解析获取需要的返回值这就是一个简单的协议命令执行流程。
Golang实现客户端驱动
有了对memache协议的了解现在来实现通讯就比较简单,首先需要定义Client结构体,保存客户端一些基本配置信息及链接信息:
type Client struct {
Timeout time.Duration
MaxIdleConns int
lock sync.Mutex
addr net.Addr
conns []*conn
}
-
Timeout tcp链接读写超时 -
conns 是memcache链接池存放的数组 -
MaxIdleConns 是Idle链接的数量 -
lock 是操作conns时加锁 -
addr则是链接的memcache的地址
memcached的单独一个conn连接结构体定义
type conn struct {
nc net.Conn
rw *bufio.ReadWriter
addr net.Addr
c *Client
}
-
nc 是建立好的tcp网络链接 -
rw 为了方便数据发送和读取设置bufio的ReadWriter -
addr 存储memcached地址 -
c 存储客户端的引用
下面是看如何获取链接和使用完之后如何将链接放回到链接池中
//获取memcached的链接
func (c *Client) getFreeConn() (cn *conn, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.conns == nil {
return nil, false
}
freelist := c.conns
if len(freelist) == 0 {
return nil, false
}
cn = freelist[len(freelist)-1]
c.conns = freelist[:len(freelist)-1]
return cn, true
}
//将使用完的链接放回到conns中
func (c *Client) putFreeConn(cn *conn) {
c.lock.Lock()
defer c.lock.Unlock()
if c.conns == nil {
c.conns = make([]*conn, 0)
}
freelist := c.conns
if len(freelist) >= c.maxIdleConns() {
cn.nc.Close()
return
}
c.conns = append(freelist, cn)
}
接下来以GET命令为例,来详细看如何进行网络传输和协议解析的实现
func (c *Client) Get(key string) (item *Item, err error) {
//check key len 验证key是否长于250字符
if !legalKey(key) {
err = ErrMalformedKey
return
}
keys := []string{key}
cn, err := c.getConn() //获取memcached链接
defer cn.condRelease(&err) // 方法执行完之后将链接release,返回到链接池中
if err != nil {
return
}
rw := cn.rw
//将gets 命令用文本行协议写入到rw中
if _, err = fmt.Fprintf(rw, "gets %srn", strings.Join(keys, " ")); err != nil {
return
}
if err = rw.Flush(); err != nil {
return
}
//获取GET命令发送之后等待和获取返回的响应数据
if err = parseGetResponse(rw.Reader, func(it *Item) { item = it }); err != nil {
return
}
if item == nil {
err = ErrCacheMiss
}
return
}
func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
for {
line, err := r.ReadSlice('n')
if err != nil {
return err
}
if bytes.Equal(line, resultEnd) { //如果获取是 ENDrn 则数据返回完,则返回
return nil
}
it := new(Item)
size, err := scanGetResponseLine(line, it)//先根据格式获取第一行数据和<data> 部分的大小
if err != nil {
return err
}
//根据bytes获取数据
it.Value = make([]byte, size+2)
_, err = io.ReadFull(r, it.Value)
if err != nil {
it.Value = nil
return err
}
if !bytes.HasSuffix(it.Value, crlf) {
it.Value = nil
return fmt.Errorf("memcache: corrupt get result read")
}
it.Value = it.Value[:size]
cb(it)
}
}
//根据返回数据格式获取返回值设置到Item结构中。
func scanGetResponseLine(line []byte, it *Item) (size int, err error) {
// 返回的数据格式 VALUE <key> <falgs> <bytes> <casid>
pattern := "VALUE %s %d %d %drn"
dest := []interface{}{&it.Key, &it.Flags, &size, &it.casid}
if bytes.Count(line, space) == 3 {
pattern = "VALUE %s %d %drn"
dest = dest[:3]
}
n, err := fmt.Sscanf(string(line), pattern, dest...)
if err != nil || n != len(dest) {
return -1, fmt.Errorf("memcache: unexpected line in get response: %q", line)
}
return size, nil
}
//判断key是否符合要求
func legalKey(key string) bool {
if len(key) > 250 {
return false
}
for i := 0; i < len(key); i++ {
if key[i] <= ' ' || key[i] == 0x7f {
return false
}
}
return true
}
其它命令不在详细描述,完整代码如下:
package memcache
import (
"bufio"
"bytes"
"errors"
"fmt"
"io"
"net"
"strconv"
"strings"
"sync"
"time"
)
//memcached -m 1024 -u root -l 127.0.0.1 -p 12001 -c 55535
//# memcached -d -m 10 -u root -l 127.0.0.1 -p 12001 -c 256 -P /tmp/memcached.pid
//-d选项是启动一个守护进程
//-m是分配给Memcache使用的内存数量,单位是MB,我这里是10MB
//-u是运行Memcache的用户
//-l是监听的服务器IP地址,如果有多个地址的话,我这里指定了服务器的IP地址127.0.0.1
//-p是设置 Memcache监听的端口,我这里设置了12001,最好是1024以上的端口
//-c选项是最大运行的并发连接数,默认是1024,我这里设置了 256,按照你服务器的负载量来设定
//-P是设置保存Memcache的pid文件,我这里是保存在 /tmp/memcached.pid
//停止进程:# kill `cat /tmp/memcached.pid`
const (
DefaultTimeout = 100 * time.Millisecond
DefaultMaxIdleConns = 40
)
var (
resultClientErrorPrefix = []byte("CLIENT_ERROR")
resultErrPrefix = []byte("ERROR")
resultServerErrPrefix = []byte("SERVER_ERROR")
crlf = []byte("rn")
space = []byte(" ")
resultOK = []byte("OKrn")
resultStored = []byte("STOREDrn")
resultNotStored = []byte("NOT_STOREDrn")
resultExists = []byte("EXISTSrn")
resultNotFound = []byte("NOT_FOUNDrn")
resultDeleted = []byte("DELETEDrn")
resultEnd = []byte("ENDrn")
resultTouched = []byte("TOUCHEDrn")
versionPrefix = []byte("VERSION")
ErrMalformedKey = errors.New("malformed: key is too long or contains invalid characters")
ErrCacheMiss = errors.New("memcache: cache miss")
ErrCASConflict = errors.New("memcache: compare-and-swap conflict")
ErrNotStored = errors.New("memcache: item not stored")
)
type Client struct {
Timeout time.Duration
MaxIdleConns int
lock sync.Mutex
addr net.Addr
conns []*conn
}
func NewClient(timeout time.Duration, maxIdleConns int, addr net.Addr) *Client {
return &Client{
Timeout: timeout,
MaxIdleConns: maxIdleConns,
lock: sync.Mutex{},
addr: addr,
conns: nil,
}
}
type Item struct {
// Key is the Item's key (250 bytes maximum).
Key string
// Value is the Item's value.
Value []byte
// Flags are server-opaque flags whose semantics are entirely
// up to the app.
Flags uint32
// Expiration is the cache expiration time, in seconds: either a relative
// time from now (up to 1 month), or an absolute Unix epoch time.
// Zero means the Item has no expiration time.
Expiration int32
// Compare and swap ID.
casid uint64
}
func (c *Client) maxIdleConns() int {
if c.MaxIdleConns > 0 {
return c.MaxIdleConns
}
return DefaultMaxIdleConns
}
func (c *Client) netTimeout() time.Duration {
if c.Timeout != 0 {
return c.Timeout
}
return DefaultTimeout
}
type conn struct {
nc net.Conn
rw *bufio.ReadWriter
addr net.Addr
c *Client
}
// 设置超时时间
func (cn *conn) extendDeadline() {
cn.nc.SetDeadline(time.Now().Add(cn.c.netTimeout()))
}
// Release 如果是正常的err 则放回到conns中,如果不是这直接close掉conn
func (cn *conn) condRelease(err *error) {
if *err == nil || resumableError(*err) {
cn.release()
} else {
fmt.Println("xxx", fmt.Sprintf("%s", (*err).Error()))
cn.nc.Close()
}
}
// release returns this connection back to the client's free pool
func (cn *conn) release() {
cn.c.putFreeConn(cn)
}
func (c *Client) putFreeConn(cn *conn) {
c.lock.Lock()
defer c.lock.Unlock()
if c.conns == nil {
c.conns = make([]*conn, 0)
}
freelist := c.conns
if len(freelist) >= c.maxIdleConns() {
cn.nc.Close()
return
}
c.conns = append(freelist, cn)
}
func (c *Client) getFreeConn() (cn *conn, ok bool) {
c.lock.Lock()
defer c.lock.Unlock()
if c.conns == nil {
return nil, false
}
freelist := c.conns
if len(freelist) == 0 {
return nil, false
}
cn = freelist[len(freelist)-1]
c.conns = freelist[:len(freelist)-1]
return cn, true
}
type ConnectTimeoutError struct {
Addr net.Addr
}
func (cte *ConnectTimeoutError) Error() string {
return "memcache: connect timeout to " + cte.Addr.String()
}
//获取memcached连接
func (c *Client) getConn() (*conn, error) {
cn, ok := c.getFreeConn()
if ok {
cn.extendDeadline()
return cn, nil
}
nc, err := c.dial(c.addr)
if err != nil {
return nil, err
}
cn = &conn{
nc: nc,
addr: c.addr,
rw: bufio.NewReadWriter(bufio.NewReader(nc), bufio.NewWriter(nc)),
c: c,
}
cn.extendDeadline()
return cn, nil
}
func (c *Client) dial(addr net.Addr) (net.Conn, error) {
nc, err := net.DialTimeout(addr.Network(), addr.String(), c.netTimeout())
if err == nil {
return nc, nil
}
if ne, ok := err.(net.Error); ok && ne.Timeout() {
return nil, &ConnectTimeoutError{Addr: addr}
}
return nil, err
}
func (c *Client) Get(key string) (item *Item, err error) {
//check key len
if !legalKey(key) {
err = ErrMalformedKey
return
}
keys := []string{key}
cn, err := c.getConn()
defer cn.condRelease(&err)
if err != nil {
return
}
rw := cn.rw
if _, err = fmt.Fprintf(rw, "gets %srn", strings.Join(keys, " ")); err != nil {
return
}
if err = rw.Flush(); err != nil {
return
}
if err = parseGetResponse(rw.Reader, func(it *Item) { item = it }); err != nil {
return
}
if item == nil {
err = ErrCacheMiss
}
return
}
func (c *Client) GetMulti(keys []string) (map[string]*Item, error) {
var lk sync.Mutex
m := make(map[string]*Item)
addItemToMap := func(it *Item) {
lk.Lock()
defer lk.Unlock()
m[it.Key] = it
}
for _, key := range keys {
if !legalKey(key) {
return nil, ErrMalformedKey
}
}
cn, err := c.getConn()
defer cn.condRelease(&err)
if err != nil {
return nil, err
}
if _, err = fmt.Fprintf(cn.rw, "gets %srn", strings.Join(keys, " ")); err != nil {
return nil, err
}
if err = cn.rw.Flush(); err != nil {
return nil, err
}
if err = parseGetResponse(cn.rw.Reader, addItemToMap); err != nil {
return nil, err
}
return m, err
}
func (c *Client) Touch(key string, seconds int32) (err error) {
cn, err := c.getConn()
if err != nil {
return
}
defer cn.condRelease(&err)
if _, err = fmt.Fprintf(cn.rw, "touch %s %drn", key, seconds); err != nil {
return
}
if err = cn.rw.Flush(); err != nil {
return
}
line, err := cn.rw.ReadSlice('n')
if err != nil {
return
}
switch {
case bytes.Equal(line, resultTouched):
break
case bytes.Equal(line, resultNotFound):
return ErrCacheMiss
default:
return fmt.Errorf("memcache: unexpected response line from touch: %q", string(line))
}
return nil
}
func (c *Client) Add(item *Item) error {
return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
return client.populateOne(rw, "add", item)
})
}
func (c *Client) Set(item *Item) error {
return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
return client.populateOne(rw, "set", item)
})
}
func (c *Client) CompareAndSwap(item *Item) error {
return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
return client.populateOne(rw, "cas", item)
})
}
func (c *Client) Replace(item *Item) error {
return c.onItem(item, func(client *Client, rw *bufio.ReadWriter, item *Item) error {
return client.populateOne(rw, "replace", item)
})
}
func (c *Client) Delete(key string) error {
if !legalKey(key) {
return ErrMalformedKey
}
cn, err := c.getConn()
if err != nil {
return err
}
defer cn.condRelease(&err)
return writeExpectf(cn.rw, resultDeleted, "delete %srn", key)
}
func (c *Client) FlushAll() error {
cn, err := c.getConn()
if err != nil {
return err
}
defer cn.condRelease(&err)
return writeExpectf(cn.rw, resultDeleted, "flush_allrn")
}
func (c *Client) Version() error {
cn, err := c.getConn()
defer cn.condRelease(&err)
if err != nil {
return err
}
return func(rw *bufio.ReadWriter) error {
if _, e := fmt.Fprintf(rw, "versionrn"); e != nil {
return err
}
if e := rw.Flush(); e != nil {
return e
}
line, e := rw.ReadSlice('n')
if e != nil {
return e
}
switch {
case bytes.HasPrefix(line, versionPrefix):
break
default:
return fmt.Errorf("memcache: unexpected response line from ping: %q", string(line))
}
return nil
}(cn.rw)
}
func (c *Client) Increment(key string, delta uint64) (newValue uint64, err error) {
return c.incrDecr("incr", key, delta)
}
func (c *Client) Decrement(key string, delta uint64) (newValue uint64, err error) {
return c.incrDecr("decr", key, delta)
}
func (c *Client) onItem(item *Item, fn func(*Client, *bufio.ReadWriter, *Item) error) error {
cn, err := c.getConn()
defer cn.condRelease(&err)
if err != nil {
return err
}
if err = fn(c, cn.rw, item); err != nil {
return err
}
return nil
}
func parseGetResponse(r *bufio.Reader, cb func(*Item)) error {
for {
line, err := r.ReadSlice('n')
if err != nil {
return err
}
if bytes.Equal(line, resultEnd) {
return nil
}
it := new(Item)
size, err := scanGetResponseLine(line, it)
if err != nil {
return err
}
it.Value = make([]byte, size+2)
_, err = io.ReadFull(r, it.Value)
if err != nil {
it.Value = nil
return err
}
if !bytes.HasSuffix(it.Value, crlf) {
it.Value = nil
return fmt.Errorf("memcache: corrupt get result read")
}
it.Value = it.Value[:size]
cb(it)
}
}
func scanGetResponseLine(line []byte, it *Item) (size int, err error) {
pattern := "VALUE %s %d %d %drn"
dest := []interface{}{&it.Key, &it.Flags, &size, &it.casid}
if bytes.Count(line, space) == 3 {
pattern = "VALUE %s %d %drn"
dest = dest[:3]
}
n, err := fmt.Sscanf(string(line), pattern, dest...)
if err != nil || n != len(dest) {
return -1, fmt.Errorf("memcache: unexpected line in get response: %q", line)
}
return size, nil
}
func legalKey(key string) bool {
if len(key) > 250 {
return false
}
for i := 0; i < len(key); i++ {
if key[i] <= ' ' || key[i] == 0x7f {
return false
}
}
return true
}
func resumableError(err error) bool {
switch err {
case ErrCacheMiss, ErrCASConflict, ErrNotStored, ErrMalformedKey:
return true
}
return false
}
func (c *Client) populateOne(rw *bufio.ReadWriter, verb string, item *Item) error {
if !legalKey(item.Key) {
return ErrMalformedKey
}
var err error
if verb == "cas" {
_, err = fmt.Fprintf(rw, "%s %s %d %d %d %drn",
verb, item.Key, item.Flags, item.Expiration, len(item.Value), item.casid)
} else {
_, err = fmt.Fprintf(rw, "%s %s %d %d %drn",
verb, item.Key, item.Flags, item.Expiration, len(item.Value))
}
if err != nil {
return err
}
if _, err = rw.Write(item.Value); err != nil {
return err
}
if _, err = rw.Write(crlf); err != nil {
return err
}
if err = rw.Flush(); err != nil {
return err
}
line, err := rw.ReadSlice('n')
if err != nil {
return err
}
switch {
case bytes.Equal(line, resultStored):
return nil
case bytes.Equal(line, resultNotStored):
return ErrNotStored
case bytes.Equal(line, resultExists):
return ErrCASConflict
case bytes.Equal(line, resultNotFound):
return ErrCacheMiss
}
return fmt.Errorf("memcache: unexpected response line from %q: %q", verb, string(line))
}
func writeExpectf(rw *bufio.ReadWriter, expect []byte, format string, args ...interface{}) error {
line, err := writeReadLine(rw, format, args...)
if err != nil {
return err
}
switch {
case bytes.Equal(line, resultOK):
return nil
case bytes.Equal(line, expect):
return nil
case bytes.Equal(line, resultNotStored):
return ErrNotStored
case bytes.Equal(line, resultExists):
return ErrCASConflict
case bytes.Equal(line, resultNotFound):
return ErrCacheMiss
}
return fmt.Errorf("memcache: unexpected response line: %q", string(line))
}
func writeReadLine(rw *bufio.ReadWriter, format string, args ...interface{}) ([]byte, error) {
_, err := fmt.Fprintf(rw, format, args...)
if err != nil {
return nil, err
}
if e := rw.Flush(); e != nil {
return nil, e
}
line, err := rw.ReadSlice('n')
return line, err
}
func (c *Client) incrDecr(verb, key string, delta uint64) (uint64, error) {
var val uint64
cn, err := c.getConn()
defer cn.condRelease(&err)
if err != nil {
return 0, err
}
func(rw *bufio.ReadWriter) error {
line, e := writeReadLine(rw, "%s %s %drn", verb, key, delta)
if e != nil {
return e
}
switch {
case bytes.Equal(line, resultNotFound):
return ErrCacheMiss
case bytes.HasPrefix(line, resultClientErrorPrefix):
errMsg := line[len(resultClientErrorPrefix) : len(line)-2]
return errors.New("memcache: client error: " + string(errMsg))
}
val, e = strconv.ParseUint(string(line[:len(line)-2]), 10, 64)
if e != nil {
return e
}
return nil
}(cn.rw)
return val, err
}
测试代码,启动一个http服务器先设置到memcached中,通过/hello 接口从memcached中获取对应的值
package main
import (
"fmt"
"memcache_go/memcache"
"net"
"net/http"
"time"
)
var client *memcache.Client
func IndexHandler(w http.ResponseWriter, r *http.Request) {
ret, err := client.Get("tcp_key")
if err != nil {
fmt.Fprintln(w, "err ")
}
str := ""
if ret != nil {
str = string(ret.Value)
fmt.Fprintln(w, "hello world", str)
} else {
fmt.Fprintln(w, "nil")
}
}
func touchHandler(w http.ResponseWriter, r *http.Request) {
err := client.Touch("tcp_key", 1000)
if err != nil {
fmt.Fprintln(w, "err ")
return
}
fmt.Fprintln(w, "succ")
}
func main() {
addr, err := net.ResolveTCPAddr("tcp", "127.0.0.1:12001")
if err != nil {
fmt.Println(fmt.Sprintf("get err %v", err))
return
}
client = memcache.NewClient(30*time.Second, 30, addr)
err = client.Add(&memcache.Item{
Key: "tcp_key",
Value: []byte(fmt.Sprintf("tcp_key_value_%d", time.Now().UnixNano())),
Flags: 0,
Expiration: 0,
})
if err != nil {
fmt.Println("执行失败")
//return
}
http.HandleFunc("/get", IndexHandler)
http.HandleFunc("/touch", touchHandler)
http.ListenAndServe("127.0.0.1:8000", nil)
//fmt.Println("memcache_test...")
for {
time.Sleep(1000 * 30)
}
}
启动memcached服务进行测试
memcached -m 1024 -u root -l 127.0.0.1 -p 12001 -c 55535
//-d选项是启动一个守护进程
//-m是分配给Memcache使用的内存数量,单位是MB,我这里是10MB
//-u是运行Memcache的用户
//-l是监听的服务器IP地址,如果有多个地址的话,我这里指定了服务器的IP地址127.0.0.1
//-p是设置 Memcache监听的端口,我这里设置了12001,最好是1024以上的端口
//-c选项是最大运行的并发连接数,默认是1024,我这里设置了 256,按照你服务器的负载量来设定
//-P是设置保存Memcache的pid文件,我这里是保存在 /tmp/memcached.pid
//停止进程:# kill `cat /tmp/memcached.pid`
到这里我们大概用了几百行代码实现了一个简单的memcached链接驱动的子集,对应用程序和memcached如何通讯有了大致了解。
原文始发于微信公众号(哆啦安全):500行代码了解缓存客户端驱动原理
- 左青龙
- 微信扫一扫
-
- 右白虎
- 微信扫一扫
-
评论