读写 Redis RESP3 协议以及Redis 6.0客户端缓存

本文涉及的产品
云数据库 Redis 版,社区版 2GB
推荐场景:
搭建游戏排行榜
云原生内存数据库 Tair,内存型 2GB
公共DNS(含HTTPDNS解析),每月1000万次HTTP解析
简介:   在四月份的一篇翻译的文章中,我介绍了读写Redis RESP version 2的协议的Go 语言的实现,你可以使用它采用底层的方式读写5.0以及以下版本的Redis。Redis 6.0还在开发之中年底或者明年初就要发布了。Redis 6.0支持多线程I/O,还有客户端缓存。

  在四月份的一篇翻译的文章中,我介绍了读写Redis RESP version 2的协议的Go 语言的实现,你可以使用它采用底层的方式读写5.0以及以下版本的Redis。Redis 6.0还在开发之中年底或者明年初就要发布了。Redis 6.0支持多线程I/O,还有客户端缓存。

  客户端缓存是未来Redis最重要的特性。如果我们需要快速存储和快速缓存,那么我们就需要在客户端存储数据的子集。这是为了提供小延迟、大规模数据的想法的自然延伸。很多公司都采用了在客户端缓存数据以避免每次都请求redis,但是本地缓存和redis服务器数据之间有延迟,很难保证数据的一致性。Ben Malec在Redis Conf 2018上做了一个关于客户端缓存的演讲,给了Salvatore Sanfilippo以灵感,Salvatore Sanfilippo决定在Redis 6.0中支持客户端缓存的功能。但是为了支持这个功能,使用当前的redis协议很难实现,所以他设计了下一代的Redis协议:RESP3 。

  所有的代码都放在了github 上。

  RESP3协议

  RESP3 协议中增加了很多新的数据类型。

  和 RESP version 2 等价的类型

  Array: 一个有序 集合,包含N个其它类型Blob string: 二进制安全字符串Simple string: 一个节省空间的非二进制安全字符串Simple error: 一个节省空间的非二进制安全错误码和错误信息Number: 有符号64位整数

  RESP3中新加的类型

  Null: 单一的空值,代替原先的 RESP v2 的*-1 和$-1 空值。Double: 浮点数Boolean: 布尔类型 true / falseBlob error: 二进制安全的错误码和错误信息Verbatim string: 一个二进制安全字符串,带文本格式, 如命令LATENCY DOCTOR 的输出Map: 一个有序 的键值对Set: 一个无序 的不重复的集合Attribute: 类似Map类型Push: 带外数据,格式类似数组,但是客户端需要检查第一个数据,第一个数据指示了带外数据的类型。注意带外数据并不是一个reply,它是redis主动推送的数据,所以客户端收到带外数据,交给对应的处理方法去处理后,你还需要继续读取你的reply数据Hello: hello命令的返回结果,类似Map类型,仅仅在客户端和服务器建立连接的时候发送Big number: 大数字类型

  还有一种新加的stream类型,可以用来传送不确定具体长度的数据。在数据的开头有固定的标识符,在数据传输完毕后在加上这个40字节的标识符,40字节的标志符基本上不会和传输的数据有重复:

  $EOF:<40 bytes marker>

  ... any number of bytes of data here not containing the marker ...

  <40 bytes marker>

  它以$EOF: 开始接着是40字节标识符,回车换行,接着就是数据,数据结束后是40字节的标识符。因为这种数据类型开始接收时数据长度不确定,所以我们对这种数据类型处理比较特殊,我们会解析出第一行的40字节标志符,后续的读取以及结束标志符需要调用者自己去读取和验证。

  可以看到出了保持和原先的RESP version 2的数据类型一致外, RESP3的数据增加了很多数据类型。因为客户端发送的命令的格式比较简单,基本上是字符串的数组形式,所以这么多类型主要用在对服务器返回数据的发送和解析上。

  每种数据类型有一个特殊的字符作为标识flag ,我们为每个类型定义了一个常量:

  const (

  // simple types

  TypeBlobString='$' // $\r

  \r

  TypeSimpleString='+' // +\r

  TypeSimpleError='-' // -\r

  TypeNumber=':' // :\r

  TypeNull='_' // _\r

  TypeDouble=',' // ,\r

  TypeBoolean='#' // #t\r

  or #f\r

  TypeBlobError='!' // !\r

  \r

  TypeVerbatimString='=' //=\r

  \r

  TypeBigNumber='(' // (

  // Aggregate data types

  TypeArray='' // \r

  ... numelements other types ...

  TypeMap='%' // %\r

  ... numelements key/value pair of other types ...

  TypeSet='~' // ~\r

  ... numelements other types ...

  TypeAttribute='|' // |~\r

  ... numelements map type ...

  TypePush='>' // >\r

  \r

  ... numelements-1 other types ...

  //special type

  TypeStream="$EOF:" // $EOF:<40 bytes marker>... any number of bytes of data here not containing the marker ...<40 bytes marker>

  )

  然后我们为所有的数据定义一个统一的模型,这样做的好处是我们可以使用统一的数据处理代码,避免复杂繁琐的数据类型:

  type Valuestruct{

  Type byte

  Str string

  StrFmt string

  Err string

  Integer int64

  Boolean bool

  Double float64

  BigInt *big.Int

  Elems []*Value // for array & set

  KV *linkedhashmap.Map //TODO sorted map, for map & attr

  Attrs *linkedhashmap.Map

  StreamMarker string

  }

  Value 代表一个数据类型的值,它既可以是客户端发送的请求,也可以是服务器端的返回或者PUSH数据。

  Type 是数据的类型标识,也就是那个特殊的字符。

  对于不同的数据类型,只有部分的字段有意义,比如字符串类型,Str 字段有意义,Err 、Elems 、KV 等就没有意义了。Verbatim string 类型则Str 和StrFmt 有意义。 错误类型则Err 有意义。数组和Set类型Elems 有意义,Map和属性则KV 有意义。

  NULL类型是没有值的,光靠Type就足够了。

  因为RESP3中规定数据前面可以有多个属性,所以每个数据还都包含一个Attrs 字段,它表示这个数据的属性。

  Stream类型只读取第一行,StreamMarker 字段表示它的40字节的标志符。

  注意Map类型我们并没有使用Go标准库的Map,而是使用一个第三方的linkedhashmap.Map库,原因在于RESP3规范中约定Map类型是有序的,而标准库的Map是基于Hash的无序的Map,所以不合适。

  我们可以把这种数据表示成Redis传输的字符串。 首先我们要读取属性,看看这个值前面是否有属性,如果有的话先把属性编码。属性是一个Map类型,按照Map类型的方式编码就可以了。注意数量是键值对的数量。接下来按照不同的类型进行编码。对于复杂类型,我们对于它包含的值递归调用编码方法即可。

  func (r *Value) ToRESP3String() string {

  buf :=new(strings.Builder)

  //check attributes

  if r.Attrs !=nil && r.Attrs.Size() >0 {

  buf.WriteByte(TypeAttribute)

  buf.WriteString(strconv.Itoa(r.Attrs.Size()))

  buf.Write(CRLFByte)

  r.Attrs.Each(func(key, val interface{}) {

  k :=key.(*Value)

  v :=val.(*Value)

  buf.WriteByte(k.Type)

  k.toRESP3String(buf)

  buf.WriteByte(v.Type)

  v.toRESP3String(buf)

  })

  }

  buf.WriteByte(r.Type)

  r.toRESP3String(buf)

  return buf.String()

  }

  func (r Value) toRESP3String(buf strings.Builder) {

  switch r.Type {

  case TypeSimpleString:

  buf.WriteString(r.Str)

  case TypeBlobString:

  ......

  case TypeArray, TypeSet, TypePush:

  buf.WriteString(strconv.Itoa(len(r.Elems)))

  buf.Write(CRLFByte)

  for _, v :=range r.Elems {

  buf.WriteByte(v.Type)

  v.toRESP3String(buf)

  }

  return

  ......

  }

  }

  有时候我们需要把数据返回成GO特性的类型,比如字符串、error、数组和Map等,我们也提供了一个SmartResult 的方法:

  func (r *Value) SmartResult() interface{} {

  switch r.Type {

  case TypeSimpleString:

  return r.Str

  case TypeBlobString:

  return r.Str

  ......

  }

  }

  Reader

  上面定义了一个统一的数据类型Value ,那么如何从一个连接中读取这个Value 呢? 我们需要一个Reader 类型。

  type Reader struct {

  *bufio.Reader

  }

  // NewReader returns a RESP3 reader.

  func NewReader(reader io.Reader) *Reader {

  return NewReaderSize(reader,32*1024)

  }

  // NewReaderSize returns a new Reader whose buffer has at least the specified size.

  func NewReaderSize(reader io.Reader, size int) *Reader {

  return &Reader{

  Reader: bufio.NewReaderSize(reader, size),

  }

  }

  我们定义了一个Reader ,你可以指定它的buffer大小,它的ReadValue 方法可以从reader中读取Value 对象,我们使用io.Reader 作为源而不是net.Conn ,是因为我们使用更通用的接口可以方便测试。

  首先我们要检查是否有属性,如果有属性,先把属性读取,属性是二手数据类型,所以按照Map的方式处理就可以了。接下来读取真正的数据。

  简单一行的数据比较好处理。比如简单字符串,简单error、数字、布尔值、空值等等,我们为每种类型都提供了一个独立的解析方法,便于管理。

  func (r Reader) ReadValue() (Value, []byte, error) {

  line, err :=r.readLine()

  if err !=nil {

  return nil, nil, err

  }

  if len(line) <3 {

  return nil, nil, ErrInvalidSyntax

  }

  var attrs *linkedhashmap.Map

  if line[0]==TypeAttribute {

  attrs, err=r.readAttr(line)

  if err !=nil {

  return nil, nil, err

  }

  line, err=r.readLine()

  }

  // check stream. if it is stream, return the stream marker

  if line[0]==TypeBlobString && len(line)==45 && bytes.Compare(line[:5], StreamMarkerPrefix)==0 {

  return nil, line[5:], nil

  }

  v :=&Value{

  Type: line[0],

  Attrs: attrs,

  }

  switch v.Type {

  case TypeSimpleString:

  v.Str=string(line[1 : len(line)-2])

  ......

  }

  }

  readLine 是读取一行的方法,Redis很多数据都是以回车换行符隔开的;getCount 从一行中读取数量值,比如数组的元素的数量等:

  func (r *Reader) readLine() (line []byte, err error) {

  line, err=r.ReadBytes('

  ')

  if err !=nil {

  return nil, err

  }

  if len(line) >1 && line[len(line)-2]=='\r' {

  return line, nil

  }

  return nil, ErrInvalidSyntax

  }

  func (r *Reader) getCount(line []byte) (int, error) {

  end :=bytes.IndexByte(line, '\r')

  return strconv.Atoi(string(line[1:end]))

  }

  复杂类型的读取采用递归的方式解析,比如Map类型:

  func (r Reader) readMap(line []byte) (linkedhashmap.Map, error) {

  count, err :=r.getCount(line)

  if err !=nil {

  return nil, err

  }

  rt :=linkedhashmap.New()

  for i :=0; i < count; i++ {

  k, streamMarkerPrefix, err :=r.ReadValue()

  if err=isError(err, streamMarkerPrefix); err !=nil {

  return nil, err

  }

  v, streamMarkerPrefix, err :=r.ReadValue()

  if err=isError(err, streamMarkerPrefix); err !=nil {

  return nil, err

  }

  rt.Put(k, v)

  }

  return rt, nil

  }

  这样,我们一个底层的RESP3 Reader就实行了,你可以连接Redis 6.0 服务器, 然后从TCP连接中读取Value 返回值,根据不同的Type进行不同的处理,或者调用SmartResult 得到一个确定的值。

  Writer

  客户端的发送命令比较简单,因为发送的数据是一个字符串数组,所以编码成一个数组,数据的元素类型是字符串就可以。

  type Writer struct {

  *bufio.Writer

  }

  // NewWriter returns a redis client writer.

  func NewWriter(writer io.Writer) *Writer {

  return &Writer{

  Writer: bufio.NewWriter(writer),

  }

  }

  // WriteCommand write a redis command.

  func (w *Writer) WriteCommand(args ...string) (err error) {

  // write the array flag

  w.WriteByte(TypeArray)

  w.WriteString(strconv.Itoa(len(args)))

  w.Write(CRLFByte)

  // write blobstring

  for _, arg :=range args {

  w.WriteByte(TypeBlobString)

  w.WriteString(strconv.Itoa(len(arg)))

  w.Write(CRLFByte)

  w.WriteString(arg)

  w.Write(CRLFByte)

  }

  return w.Flush()

  }

  这样,一个完整的RESP3的读写器就完成了。 有什么用?

  你可以使用它和redis进行通讯,它支持目前还没有发送的redis 6.0。 你也可以基于它实现一个redis的Go client库,支持最新的redis 6.0的client库,可以支持接收PUSH消息,实现pipeline的机制,接收流数据等等。

  你也可以使用它实现一个类似Codis的proxy,其中协议的解析就不用写了,直接使用它就可以。

  客户端缓存

  为了帮助客户端实现缓存,尽可能和redis的数据保持一致,redis需要一些额外的改进,这些额外的改进称之为tracking 。

  key空间整体被分为很多的哈希槽,redis 6.0使用24比特位作为CRC64的输出,所以会有1600多万个不同的哈希槽。哈希槽的多少是一个tradeoff, 多了占用太多的内存空间,太少又容易引起惊群的现象。如果你有1亿个key,而在客户端缓存中,收到一个失效消息不应该影响过多的key。Redis用于存储invalidation表的内存开销为130MiB,一个1600万个条目,每个条目8字节的数组。这对我来说没问题,如果你想要这个功能,你就要充分利用客户端所有的内存,所以使用130MB服务器端内存作为代价;你所赢得的是更细粒度的失效。

  客户端连接到redis服务器之后,要想打开这个特性,需要发送:

  CLIENT TRACKING on

  服务端回复+OK 表示正常,同时从那时开始,每一个只读命令,除了返回key对应的数据给调用者以外,还有一个副作用,就是记录所有客户端请求的key的哈希槽(但仅仅是对只读命令的key)。Redis存储这些信息的方法很简单。每个Redis客户端都有一个惟一的ID,因此,如果ID为123的客户端执行一个MGET 命令,它的keys对应的哈希槽是1、2和5,我们将得到一个包含以下条目的无效表:

  1 ->[123]

  2 ->[123]

  5 ->[123]

  稍后,ID为888的客户端来请求哈希槽5中的key,所以这个表变为:

  5 ->[123,888]

  现在,其他一些客户端在哈希槽5中更改了一些key。发生的情况是,Redis将检查Invalidation表,发现客户端123和888可能都在该槽上缓存了key。我们会向客户发送一个失效消息,他们可以有多种处理方式:要么记住哈希槽最新的失效时间戳,然后在使用的时候才检查缓存对象的失效时间戳,如果超出这个时间戳,则删除这个key,这称为lazy的方式。或者,客户端可以获取关于这个哈希槽的所有缓存内容,然后直接删除哈希槽即可。这种使用24位散列函数的方法不是问题,即使缓存了数千万个key,我们也不会有很长的哈希槽记录。在发送失效消息之后,服务端会从Invalidation表中删除条目,这样服务端将不再向这些客户端发送失效消息,直到它们再次读取该哈希槽内的key。

  客户端也也可以有一些自己的设计,比如使用20比特位记录哈希槽,做好服务器的24比特位和20比特币之间的对应就好。

  Redis是通过Push消息将失效消息发送给客户端的。你也可以使用另外一个客户端(1234)负责接收失效消息:

  CLIENT TRACKING on REDIRECT 1234

  客户端ID可以通过CLIENT ID 请求获取。

  接下来我们使用前面实现的读写器来验证TRACKING的功能。

  // 首先连接一个redis 6.0服务器,只有redis 6.0的服务器才开始支持TRACKING

  conn, err :=net.DialTimeout("tcp", "127.0.0.1:6379",5*time.Second)

  if err !=nil {

  t.Logf("can't found one of redis 6.0 server")

  return

  }

  defer conn.Close()

  w :=NewWriter(conn)

  r :=NewReader(conn)

  // 告诉redis采用RESP3的协议

  w.WriteCommand("HELLO", "3")

  helloResp, _, err :=r.ReadValue()

  if err !=nil {

  t.Fatalf("failed to send a HELLO 3")

  }

  if helloResp.KV.Size()==0 {

  t.Fatalf("expect some info but got %+v", helloResp)

  }

  t.Logf("hello response: %c, %v", helloResp.Type, helloResp.SmartResult())

  // 通知服务器开始追踪

  w.WriteCommand("CLIENT", "TRACKING", "on")

  resp, _, err :=r.ReadValue()

  if err !=nil {

  t.Fatalf("failed to TRACKING: %v", err)

  }

  t.Logf("TRACKING result: %c, %+v", resp.Type, resp.SmartResult())

  // 请求一次,服务器应该计算出哈希槽,并且关联这个哈希槽和这个连接

  w.WriteCommand("GET", "a")

  resp, _, err=r.ReadValue()

  if err !=nil {

  t.Fatalf("failed to GET: %v", err)

  }

  t.Logf("GET result: %c, %+v", resp.Type, resp.SmartResult())

  // 启动另外一个连接,模拟更新数据

  go func() {

  conn, err :=net.DialTimeout("tcp", "127.0.0.1:9999",5*time.Second)

  if err !=nil {

  t.Logf("can't found one of redis 6.0 server")

  return

  }

  defer conn.Close()

  w :=NewWriter(conn)

  r :=NewReader(conn)

  // 根据key计算出的哈希槽的哈希,服务器的PUSH消息应该推送这个槽的hash

  hash :=crc64([]byte("a")) & (TRACKING_TABLE_SIZE -1)

  t.Logf("calculated hash: %d", hash)

  for i :=0; i <10; i++ {

  // 模拟更新数据

  w.WriteCommand("set", "a", strconv.Itoa(i))

  resp, _, err=r.ReadValue()

  if err !=nil {

  t.Fatalf("failed to set: %v", err)

  }

  t.Logf("set result: %c, %+v", resp.Type, resp.SmartResult())

  time.Sleep(200 * time.Millisecond)

  }

  }()

  for i :=0; i <10; i++ {

  // 读取一个PUSH数据

  resp, _, err=r.ReadValue()

  if err !=nil {

  t.Fatalf("failed to receive a message: %v", err)

  }

  if resp.Type==TypePush && len(resp.Elems) >=2 && resp.Elems[0].SmartResult().(string)=="invalidate" {

  t.Logf("received TRACKING result: %c, %+v", resp.Type, resp.SmartResult())

  // 推送消息后,服务器就不再关联这个哈希槽和这个连接了,所以我们需要在拉取一次数据,以便继续跟踪

  w.WriteCommand("GET", "a")

  resp, _, err=r.ReadValue()

  }

  }

相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
23天前
|
缓存 NoSQL Java
Redis 缓存与数据库数据不一致问题
Redis 缓存与数据库数据不一致问题
51 3
|
18天前
|
缓存 NoSQL 关系型数据库
(八)漫谈分布式之缓存篇:唠唠老生常谈的MySQL与Redis数据一致性问题!
本文来聊一个跟实际工作挂钩的老生常谈的问题:分布式系统中的缓存一致性。
72 10
|
20天前
|
缓存 NoSQL Serverless
函数计算产品使用问题之如何使用Redis作为缓存插件
阿里云Serverless 应用引擎(SAE)提供了完整的微服务应用生命周期管理能力,包括应用部署、服务治理、开发运维、资源管理等功能,并通过扩展功能支持多环境管理、API Gateway、事件驱动等高级应用场景,帮助企业快速构建、部署、运维和扩展微服务架构,实现Serverless化的应用部署与运维模式。以下是对SAE产品使用合集的概述,包括应用管理、服务治理、开发运维、资源管理等方面。
|
23天前
|
存储 缓存 NoSQL
Redis 缓存常见问题
Redis 缓存常见问题
31 3
|
5天前
|
存储 缓存 NoSQL
基于SpringBoot+Redis解决缓存与数据库一致性、缓存穿透、缓存雪崩、缓存击穿问题
这篇文章讨论了在使用SpringBoot和Redis时如何解决缓存与数据库一致性问题、缓存穿透、缓存雪崩和缓存击穿问题,并提供了相应的解决策略和示例代码。
16 0
|
5天前
|
缓存 NoSQL Ubuntu
如何在 Ubuntu 14.04 上配置 Redis 缓存以加速 WordPress
如何在 Ubuntu 14.04 上配置 Redis 缓存以加速 WordPress
10 0
|
1月前
|
canal 缓存 NoSQL
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;先删除缓存还是先修改数据库,双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
Redis常见面试题(一):Redis使用场景,缓存、分布式锁;缓存穿透、缓存击穿、缓存雪崩;双写一致,Canal,Redis持久化,数据过期策略,数据淘汰策略
|
26天前
|
消息中间件 缓存 数据库
Redis问题之如何解决缓存更新失败导致的数据不一致问题
Redis问题之如何解决缓存更新失败导致的数据不一致问题
|
26天前
|
缓存 NoSQL 数据库
Redis问题之在高并发场景下,保证Redis缓存和数据库的一致性如何解决
Redis问题之在高并发场景下,保证Redis缓存和数据库的一致性如何解决