在四月份的一篇翻译的文章中,我介绍了读写Redis RESP version 2的协议的Go 语言的实现,你可以使用它采用底层的方式读写5.0以及以下版本的Redis。Redis 6.0还在开发之中年底或者明年初就要发布了。Redis 6.0支持多线程I/O,还有客户端缓存。
客户端缓存是未来Redis最重要的特性。如果我们需要快速存储和快速缓存,那么我们就需要在客户端存储数据的子集。这是为了提供小延迟、大规模数据的想法的自然延伸。很多公司都采用了在客户端缓存数据以避免每次都请求redis,但是本地缓存和redis服务器数据之间有延迟,很难保证数据的一致性。Ben Malec在Redis Conf 2018上做了一个关于客户端缓存的演讲,给了Salvatore Sanfilippo以灵感,Salvatore Sanfilippo决定在Redis 6.0中支持客户端缓存的功能。但是为了支持这个功能,使用当前的redis协议很难实现,所以他设计了下一代的Redis协议:RESP3 。
所有的代码都放在了github 上。
RESP3协议
RESP3 协议中增加了很多新的数据类型。
和 RESP version 2 等价的类型
Array: 一个有序 集合,包含N个其它类型Blob string: 二进制安全字符串Simple string: 一个节省空间的非二进制安全字符串Simple error: 一个节省空间的非二进制安全错误码和错误信息Number: 有符号64位整数
RESP3中新加的类型
Null: 单一的空值,代替原先的 RESP v2 的*-1 和$-1 空值。Double: 浮点数Boolean: 布尔类型 true / falseBlob error: 二进制安全的错误码和错误信息Verbatim string: 一个二进制安全字符串,带文本格式, 如命令LATENCY DOCTOR 的输出Map: 一个有序 的键值对Set: 一个无序 的不重复的集合Attribute: 类似Map类型Push: 带外数据,格式类似数组,但是客户端需要检查第一个数据,第一个数据指示了带外数据的类型。注意带外数据并不是一个reply,它是redis主动推送的数据,所以客户端收到带外数据,交给对应的处理方法去处理后,你还需要继续读取你的reply数据Hello: hello命令的返回结果,类似Map类型,仅仅在客户端和服务器建立连接的时候发送Big number: 大数字类型
还有一种新加的stream类型,可以用来传送不确定具体长度的数据。在数据的开头有固定的标识符,在数据传输完毕后在加上这个40字节的标识符,40字节的标志符基本上不会和传输的数据有重复:
$EOF:<40 bytes marker>
... any number of bytes of data here not containing the marker ...
<40 bytes marker>
它以$EOF: 开始接着是40字节标识符,回车换行,接着就是数据,数据结束后是40字节的标识符。因为这种数据类型开始接收时数据长度不确定,所以我们对这种数据类型处理比较特殊,我们会解析出第一行的40字节标志符,后续的读取以及结束标志符需要调用者自己去读取和验证。
可以看到出了保持和原先的RESP version 2的数据类型一致外, RESP3的数据增加了很多数据类型。因为客户端发送的命令的格式比较简单,基本上是字符串的数组形式,所以这么多类型主要用在对服务器返回数据的发送和解析上。
每种数据类型有一个特殊的字符作为标识flag ,我们为每个类型定义了一个常量:
const (
// simple types
TypeBlobString='$' // $\r
\r
TypeSimpleString='+' // +\r
TypeSimpleError='-' // -\r
TypeNumber=':' // :\r
TypeNull='_' // _\r
TypeDouble=',' // ,\r
TypeBoolean='#' // #t\r
or #f\r
TypeBlobError='!' // !\r
\r
TypeVerbatimString='=' //=\r
\r
TypeBigNumber='(' // (
// Aggregate data types
TypeArray='' // \r
... numelements other types ...
TypeMap='%' // %\r
... numelements key/value pair of other types ...
TypeSet='~' // ~\r
... numelements other types ...
TypeAttribute='|' // |~\r
... numelements map type ...
TypePush='>' // >\r
\r
... numelements-1 other types ...
//special type
TypeStream="$EOF:" // $EOF:<40 bytes marker>... any number of bytes of data here not containing the marker ...<40 bytes marker>
)
然后我们为所有的数据定义一个统一的模型,这样做的好处是我们可以使用统一的数据处理代码,避免复杂繁琐的数据类型:
type Valuestruct{
Type byte
Str string
StrFmt string
Err string
Integer int64
Boolean bool
Double float64
BigInt *big.Int
Elems []*Value // for array & set
KV *linkedhashmap.Map //TODO sorted map, for map & attr
Attrs *linkedhashmap.Map
StreamMarker string
}
Value 代表一个数据类型的值,它既可以是客户端发送的请求,也可以是服务器端的返回或者PUSH数据。
Type 是数据的类型标识,也就是那个特殊的字符。
对于不同的数据类型,只有部分的字段有意义,比如字符串类型,Str 字段有意义,Err 、Elems 、KV 等就没有意义了。Verbatim string 类型则Str 和StrFmt 有意义。 错误类型则Err 有意义。数组和Set类型Elems 有意义,Map和属性则KV 有意义。
NULL类型是没有值的,光靠Type就足够了。
因为RESP3中规定数据前面可以有多个属性,所以每个数据还都包含一个Attrs 字段,它表示这个数据的属性。
Stream类型只读取第一行,StreamMarker 字段表示它的40字节的标志符。
注意Map类型我们并没有使用Go标准库的Map,而是使用一个第三方的linkedhashmap.Map库,原因在于RESP3规范中约定Map类型是有序的,而标准库的Map是基于Hash的无序的Map,所以不合适。
我们可以把这种数据表示成Redis传输的字符串。 首先我们要读取属性,看看这个值前面是否有属性,如果有的话先把属性编码。属性是一个Map类型,按照Map类型的方式编码就可以了。注意数量是键值对的数量。接下来按照不同的类型进行编码。对于复杂类型,我们对于它包含的值递归调用编码方法即可。
func (r *Value) ToRESP3String() string {
buf :=new(strings.Builder)
//check attributes
if r.Attrs !=nil && r.Attrs.Size() >0 {
buf.WriteByte(TypeAttribute)
buf.WriteString(strconv.Itoa(r.Attrs.Size()))
buf.Write(CRLFByte)
r.Attrs.Each(func(key, val interface{}) {
k :=key.(*Value)
v :=val.(*Value)
buf.WriteByte(k.Type)
k.toRESP3String(buf)
buf.WriteByte(v.Type)
v.toRESP3String(buf)
})
}
buf.WriteByte(r.Type)
r.toRESP3String(buf)
return buf.String()
}
func (r Value) toRESP3String(buf strings.Builder) {
switch r.Type {
case TypeSimpleString:
buf.WriteString(r.Str)
case TypeBlobString:
......
case TypeArray, TypeSet, TypePush:
buf.WriteString(strconv.Itoa(len(r.Elems)))
buf.Write(CRLFByte)
for _, v :=range r.Elems {
buf.WriteByte(v.Type)
v.toRESP3String(buf)
}
return
......
}
}
有时候我们需要把数据返回成GO特性的类型,比如字符串、error、数组和Map等,我们也提供了一个SmartResult 的方法:
func (r *Value) SmartResult() interface{} {
switch r.Type {
case TypeSimpleString:
return r.Str
case TypeBlobString:
return r.Str
......
}
}
Reader
上面定义了一个统一的数据类型Value ,那么如何从一个连接中读取这个Value 呢? 我们需要一个Reader 类型。
type Reader struct {
*bufio.Reader
}
// NewReader returns a RESP3 reader.
func NewReader(reader io.Reader) *Reader {
return NewReaderSize(reader,32*1024)
}
// NewReaderSize returns a new Reader whose buffer has at least the specified size.
func NewReaderSize(reader io.Reader, size int) *Reader {
return &Reader{
Reader: bufio.NewReaderSize(reader, size),
}
}
我们定义了一个Reader ,你可以指定它的buffer大小,它的ReadValue 方法可以从reader中读取Value 对象,我们使用io.Reader 作为源而不是net.Conn ,是因为我们使用更通用的接口可以方便测试。
首先我们要检查是否有属性,如果有属性,先把属性读取,属性是二手数据类型,所以按照Map的方式处理就可以了。接下来读取真正的数据。
简单一行的数据比较好处理。比如简单字符串,简单error、数字、布尔值、空值等等,我们为每种类型都提供了一个独立的解析方法,便于管理。
func (r Reader) ReadValue() (Value, []byte, error) {
line, err :=r.readLine()
if err !=nil {
return nil, nil, err
}
if len(line) <3 {
return nil, nil, ErrInvalidSyntax
}
var attrs *linkedhashmap.Map
if line[0]==TypeAttribute {
attrs, err=r.readAttr(line)
if err !=nil {
return nil, nil, err
}
line, err=r.readLine()
}
// check stream. if it is stream, return the stream marker
if line[0]==TypeBlobString && len(line)==45 && bytes.Compare(line[:5], StreamMarkerPrefix)==0 {
return nil, line[5:], nil
}
v :=&Value{
Type: line[0],
Attrs: attrs,
}
switch v.Type {
case TypeSimpleString:
v.Str=string(line[1 : len(line)-2])
......
}
}
readLine 是读取一行的方法,Redis很多数据都是以回车换行符隔开的;getCount 从一行中读取数量值,比如数组的元素的数量等:
func (r *Reader) readLine() (line []byte, err error) {
line, err=r.ReadBytes('
')
if err !=nil {
return nil, err
}
if len(line) >1 && line[len(line)-2]=='\r' {
return line, nil
}
return nil, ErrInvalidSyntax
}
func (r *Reader) getCount(line []byte) (int, error) {
end :=bytes.IndexByte(line, '\r')
return strconv.Atoi(string(line[1:end]))
}
复杂类型的读取采用递归的方式解析,比如Map类型:
func (r Reader) readMap(line []byte) (linkedhashmap.Map, error) {
count, err :=r.getCount(line)
if err !=nil {
return nil, err
}
rt :=linkedhashmap.New()
for i :=0; i < count; i++ {
k, streamMarkerPrefix, err :=r.ReadValue()
if err=isError(err, streamMarkerPrefix); err !=nil {
return nil, err
}
v, streamMarkerPrefix, err :=r.ReadValue()
if err=isError(err, streamMarkerPrefix); err !=nil {
return nil, err
}
rt.Put(k, v)
}
return rt, nil
}
这样,我们一个底层的RESP3 Reader就实行了,你可以连接Redis 6.0 服务器, 然后从TCP连接中读取Value 返回值,根据不同的Type进行不同的处理,或者调用SmartResult 得到一个确定的值。
Writer
客户端的发送命令比较简单,因为发送的数据是一个字符串数组,所以编码成一个数组,数据的元素类型是字符串就可以。
type Writer struct {
*bufio.Writer
}
// NewWriter returns a redis client writer.
func NewWriter(writer io.Writer) *Writer {
return &Writer{
Writer: bufio.NewWriter(writer),
}
}
// WriteCommand write a redis command.
func (w *Writer) WriteCommand(args ...string) (err error) {
// write the array flag
w.WriteByte(TypeArray)
w.WriteString(strconv.Itoa(len(args)))
w.Write(CRLFByte)
// write blobstring
for _, arg :=range args {
w.WriteByte(TypeBlobString)
w.WriteString(strconv.Itoa(len(arg)))
w.Write(CRLFByte)
w.WriteString(arg)
w.Write(CRLFByte)
}
return w.Flush()
}
这样,一个完整的RESP3的读写器就完成了。 有什么用?
你可以使用它和redis进行通讯,它支持目前还没有发送的redis 6.0。 你也可以基于它实现一个redis的Go client库,支持最新的redis 6.0的client库,可以支持接收PUSH消息,实现pipeline的机制,接收流数据等等。
你也可以使用它实现一个类似Codis的proxy,其中协议的解析就不用写了,直接使用它就可以。
客户端缓存
为了帮助客户端实现缓存,尽可能和redis的数据保持一致,redis需要一些额外的改进,这些额外的改进称之为tracking 。
key空间整体被分为很多的哈希槽,redis 6.0使用24比特位作为CRC64的输出,所以会有1600多万个不同的哈希槽。哈希槽的多少是一个tradeoff, 多了占用太多的内存空间,太少又容易引起惊群的现象。如果你有1亿个key,而在客户端缓存中,收到一个失效消息不应该影响过多的key。Redis用于存储invalidation表的内存开销为130MiB,一个1600万个条目,每个条目8字节的数组。这对我来说没问题,如果你想要这个功能,你就要充分利用客户端所有的内存,所以使用130MB服务器端内存作为代价;你所赢得的是更细粒度的失效。
客户端连接到redis服务器之后,要想打开这个特性,需要发送:
CLIENT TRACKING on
服务端回复+OK 表示正常,同时从那时开始,每一个只读命令,除了返回key对应的数据给调用者以外,还有一个副作用,就是记录所有客户端请求的key的哈希槽(但仅仅是对只读命令的key)。Redis存储这些信息的方法很简单。每个Redis客户端都有一个惟一的ID,因此,如果ID为123的客户端执行一个MGET 命令,它的keys对应的哈希槽是1、2和5,我们将得到一个包含以下条目的无效表:
1 ->[123]
2 ->[123]
5 ->[123]
稍后,ID为888的客户端来请求哈希槽5中的key,所以这个表变为:
5 ->[123,888]
现在,其他一些客户端在哈希槽5中更改了一些key。发生的情况是,Redis将检查Invalidation表,发现客户端123和888可能都在该槽上缓存了key。我们会向客户发送一个失效消息,他们可以有多种处理方式:要么记住哈希槽最新的失效时间戳,然后在使用的时候才检查缓存对象的失效时间戳,如果超出这个时间戳,则删除这个key,这称为lazy的方式。或者,客户端可以获取关于这个哈希槽的所有缓存内容,然后直接删除哈希槽即可。这种使用24位散列函数的方法不是问题,即使缓存了数千万个key,我们也不会有很长的哈希槽记录。在发送失效消息之后,服务端会从Invalidation表中删除条目,这样服务端将不再向这些客户端发送失效消息,直到它们再次读取该哈希槽内的key。
客户端也也可以有一些自己的设计,比如使用20比特位记录哈希槽,做好服务器的24比特位和20比特币之间的对应就好。
Redis是通过Push消息将失效消息发送给客户端的。你也可以使用另外一个客户端(1234)负责接收失效消息:
CLIENT TRACKING on REDIRECT 1234
客户端ID可以通过CLIENT ID 请求获取。
接下来我们使用前面实现的读写器来验证TRACKING的功能。
// 首先连接一个redis 6.0服务器,只有redis 6.0的服务器才开始支持TRACKING
conn, err :=net.DialTimeout("tcp", "127.0.0.1:6379",5*time.Second)
if err !=nil {
t.Logf("can't found one of redis 6.0 server")
return
}
defer conn.Close()
w :=NewWriter(conn)
r :=NewReader(conn)
// 告诉redis采用RESP3的协议
w.WriteCommand("HELLO", "3")
helloResp, _, err :=r.ReadValue()
if err !=nil {
t.Fatalf("failed to send a HELLO 3")
}
if helloResp.KV.Size()==0 {
t.Fatalf("expect some info but got %+v", helloResp)
}
t.Logf("hello response: %c, %v", helloResp.Type, helloResp.SmartResult())
// 通知服务器开始追踪
w.WriteCommand("CLIENT", "TRACKING", "on")
resp, _, err :=r.ReadValue()
if err !=nil {
t.Fatalf("failed to TRACKING: %v", err)
}
t.Logf("TRACKING result: %c, %+v", resp.Type, resp.SmartResult())
// 请求一次,服务器应该计算出哈希槽,并且关联这个哈希槽和这个连接
w.WriteCommand("GET", "a")
resp, _, err=r.ReadValue()
if err !=nil {
t.Fatalf("failed to GET: %v", err)
}
t.Logf("GET result: %c, %+v", resp.Type, resp.SmartResult())
// 启动另外一个连接,模拟更新数据
go func() {
conn, err :=net.DialTimeout("tcp", "127.0.0.1:9999",5*time.Second)
if err !=nil {
t.Logf("can't found one of redis 6.0 server")
return
}
defer conn.Close()
w :=NewWriter(conn)
r :=NewReader(conn)
// 根据key计算出的哈希槽的哈希,服务器的PUSH消息应该推送这个槽的hash
hash :=crc64([]byte("a")) & (TRACKING_TABLE_SIZE -1)
t.Logf("calculated hash: %d", hash)
for i :=0; i <10; i++ {
// 模拟更新数据
w.WriteCommand("set", "a", strconv.Itoa(i))
resp, _, err=r.ReadValue()
if err !=nil {
t.Fatalf("failed to set: %v", err)
}
t.Logf("set result: %c, %+v", resp.Type, resp.SmartResult())
time.Sleep(200 * time.Millisecond)
}
}()
for i :=0; i <10; i++ {
// 读取一个PUSH数据
resp, _, err=r.ReadValue()
if err !=nil {
t.Fatalf("failed to receive a message: %v", err)
}
if resp.Type==TypePush && len(resp.Elems) >=2 && resp.Elems[0].SmartResult().(string)=="invalidate" {
t.Logf("received TRACKING result: %c, %+v", resp.Type, resp.SmartResult())
// 推送消息后,服务器就不再关联这个哈希槽和这个连接了,所以我们需要在拉取一次数据,以便继续跟踪
w.WriteCommand("GET", "a")
resp, _, err=r.ReadValue()
}
}