Golang实现redis系列-(3)封装RESP协议

本文涉及的产品
云数据库 Tair(兼容Redis),内存型 2GB
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
简介: Golang实现redis系列-(3)封装RESP协议

RESP

具体RESP协议不再展开,网上很多,这里给出核心代码与流程简图

/*
简单字符串:以"+" 开始【+OK\r\n】 (Simple String)
错误:     以"-" 开始【-ERR Invalid Syntax\r\n】 (Error)
整数:     以":" 开始【:1\r\n】 (Int)
字符串:    以 $  开始【$3\r\nSET\r\n】(Bulk String)
数组:     以 *  开始【*3\r\n$3\r\nSET\r\n$3\r\nwxf\r\n$5\r\n123\r\n】(Multi Bulk Strings)
*/

RESP是redis客户端与服务端进行通信的协议,所以在这里要对RESP进行封装,将一次命令封装成为一个reply,然后传递给channel,调用者通过读取channel就可以获得一次完整的命令

下面是流程图


代码

perser.go(核心)

package RESP
import (
  "bufio"
  "bytes"
  "errors"
  "io"
  "strconv"
  "strings"
)
// DataCache stores Reply or error
type DataCache struct {
  Data Reply
  Err  error
}
// ParseStream reads data from io.Reader and send payloads through channel
func ParseStream(reader io.Reader) <-chan *DataCache {
  DataChan := make(chan *DataCache)
  go parse0(reader, DataChan)
  return DataChan
}
// ParseBytes reads data from []byte and return all replies
func ParseBytes(data []byte) ([]Reply, error) {
  DataChan := make(chan *DataCache)
  reader := bytes.NewReader(data)
  go parse0(reader, DataChan)
  var results []Reply
  for DataCache := range DataChan {
    if DataCache == nil {
      return nil, errors.New("no reply")
    }
    if DataCache.Err != nil {
      if DataCache.Err == io.EOF {
        break
      } else {
        return nil, DataCache.Err
      }
    }
    results = append(results, DataCache.Data)
  }
  return results, nil
}
// ParseOne reads data from []byte and return the first DataCache
func ParseOne(data []byte) (Reply, error) {
  DataChan := make(chan *DataCache)
  reader := bytes.NewReader(data)
  go parse0(reader, DataChan)
  DataCache := <-DataChan
  if DataCache == nil {
    return nil, errors.New("no reply")
  }
  return DataCache.Data, DataCache.Err
}
type readState struct {
  readingMultiLine  bool //Need to read multiple lines
  expectedArgsCount int  //want to read args
  msgType           byte //type
  args              [][]byte
  bulkLen           int
}
func (r *readState) finished() bool {
  return r.expectedArgsCount > 0 && len(r.args) == r.expectedArgsCount
}
func parse0(reader io.Reader, ch chan *DataCache) {
  bufReader := bufio.NewReader(reader)
  var state readState
  for {
    // read one line
    msg, err := readLine(bufReader, &state) //Read a row including \r\n
    if err != nil {
      if err == io.EOF { //encounter io err, stop read
        ch <- &DataCache{Err: err}
        close(ch)
        return
      }
      ch <- &DataCache{Err: err}
      state = readState{}
      continue
    }
    // parse line
    if state.readingMultiLine == false { //The first time a command enters, it will be false
      switch msg[0] {
      case '*': //multi bulk reply
        err := parseMultiBulkHeader(msg, &state)
        if err != nil {
          ch <- &DataCache{Err: err}
          state = readState{} // reset state
          continue
        }
        if state.expectedArgsCount == 0 {
          ch <- &DataCache{
            Data: MakeEmptyMultiBulkReply(),
          }
          state = readState{} // reset state
          continue
        }
      case '$': // bulk reply
        err = parseBulkHeader(msg, &state)
        if err != nil {
          ch <- &DataCache{
            Err: err,
          }
          state = readState{} // reset state
          continue
        }
        if state.bulkLen == -1 { // null bulk reply
          ch <- &DataCache{
            Data: MakeNullBulkReply(),
          }
          state = readState{} // reset state
          continue
        }
      default: //single line reply
        result, err := parseSingleLineReply(msg)
        ch <- &DataCache{
          Data: result,
          Err:  err,
        }
        state = readState{} // reset state
        continue
      }
    } else if state.readingMultiLine == true {
      err = readBody(msg, &state)
      if err != nil {
        ch <- &DataCache{Err: err}
        state = readState{} // reset state
        continue
      }
      if state.finished() {
        var result Reply
        if state.msgType == '*' {
          result = MakeMultiBulkReply(state.args)
        } else if state.msgType == '$' {
          result = MakeBulkReply(state.args[0])
        }
        ch <- &DataCache{
          Data: result,
          Err:  err,
        }
        state = readState{}
      }
    }
  }
}
// readLine Read a row including \r\n
func readLine(bufReader *bufio.Reader, state *readState) ([]byte, error) {
  var msg []byte
  var err error
  if state.bulkLen == 0 { //Read an instruction for the first time
    msg, err = bufReader.ReadBytes('\n')
    if err != nil {
      err = io.EOF
      return nil, err
    }
    if len(msg) == 0 || msg[len(msg)-2] != '\r' {
      return nil, errors.New("protocol error: " + string(msg))
    }
  } else { //Read an instruction for the nth time
    msg = make([]byte, state.bulkLen+2)
    _, err := io.ReadFull(bufReader, msg)
    if err != nil {
      err = io.EOF
      return nil, err
    }
    if len(msg) == 0 || msg[len(msg)-2] != '\r' {
      return nil, errors.New("protocol error: " + string(msg))
    }
    state.bulkLen = 0
  }
  return msg, nil
}
// parseMultiBulkHeader parse Multi Bulk Header
func parseMultiBulkHeader(msg []byte, state *readState) error {
  expectedLine, err := strconv.Atoi(string(msg[1 : len(msg)-2]))
  if err != nil {
    return errors.New("protocol error: " + string(msg))
  }
  if expectedLine == 0 {
    state.expectedArgsCount = 0
    return nil
  } else {
    // first line of multi bulk reply
    state.msgType = msg[0]
    state.readingMultiLine = true
    state.expectedArgsCount = expectedLine
    state.args = make([][]byte, 0, expectedLine)
    return nil
  }
}
func parseBulkHeader(msg []byte, state *readState) error {
  var err error
  state.bulkLen, err = strconv.Atoi(string(msg[1 : len(msg)-2]))
  if err != nil {
    return errors.New("protocol error: " + string(msg))
  }
  if state.bulkLen == -1 {
    return nil
  } else {
    state.msgType = msg[0]
    state.readingMultiLine = true
    state.expectedArgsCount = 1
    state.args = make([][]byte, 0, 1)
    return nil
  }
}
func parseSingleLineReply(msg []byte) (Reply, error) {
  var result Reply
  str := strings.TrimSuffix(string(msg), "\r\n")
  switch msg[0] {
  case '+': // status reply
    result = MakeStatusReply(str[1:])
  case '-': // err reply
    result = MakeErrorReply(str[1:])
  case ':':
    val, _ := strconv.ParseInt(str[1:], 10, 64)
    result = MakeIntReply(val)
  default: //Information entered
    strs := strings.Split(str, " ")
    args := make([][]byte, len(strs))
    for i, s := range strs {
      args[i] = []byte(s)
    }
    result = MakeMultiBulkReply(args)
  }
  return result, nil
}
// read the non-first lines of multi bulk reply or bulk reply
func readBody(msg []byte, state *readState) error {
  line := msg[:len(msg)-2]
  if line[0] == '$' {
    // bulk reply
    var err error
    state.bulkLen, err = strconv.Atoi(string(line[1:]))
    if err != nil {
      return errors.New("protocol error: " + string(msg))
    }
    if state.bulkLen <= 0 { // null bulk in multi bulks
      state.args = append(state.args, []byte{})
      state.bulkLen = 0
    }
  } else {
    state.args = append(state.args, line)
  }
  return nil
}

reply.go

package RESP
import (
  "bytes"
  "strconv"
)
type Reply interface {
  ToBytes() []byte
}
var CRLF = "\r\n"
var nullBulkReplyBytes = []byte("$-1\r\n")
/*
简单字符串:以"+" 开始【+OK\r\n】 (Simple String)
错误:     以"-" 开始【-ERR Invalid Syntax\r\n】 (Error)
整数:     以":" 开始【:1\r\n】 (Int)
字符串:    以 $  开始【$3\r\nSET\r\n】(Bulk String)
数组:     以 *  开始【*3\r\n$3\r\nSET\r\n$3\r\nwxf\r\n$5\r\n123\r\n】(Multi Bulk Strings)
*/
/* ---- Status Reply ---- */
// StatusReply indicates the status,The server is used to return simple results
type StatusReply struct {
  Status string
}
// MakeStatusReply create a StatusReply
func MakeStatusReply(status string) *StatusReply {
  return &StatusReply{Status: status}
}
// ToBytes marshal StatusReply
func (s *StatusReply) ToBytes() []byte {
  return []byte("+" + s.Status + CRLF)
}
/* ---- Error Reply ---- */
// ErrorReply The server is used to return simple error messages
type ErrorReply struct {
  Err string
}
// MakeErrorReply create a ErrorReply
func MakeErrorReply(err string) *ErrorReply {
  return &ErrorReply{Err: err}
}
// ToBytes marshal ErrorReply
func (e *ErrorReply) ToBytes() []byte {
  return []byte("-" + e.Err + CRLF)
}
/* ---- Int Reply ---- */
// IntReply Is the return value of the command such as [strlen key],int64 type.
type IntReply struct {
  Num int64
}
// MakeIntReply create a IntReply
func MakeIntReply(num int64) *IntReply {
  return &IntReply{Num: num}
}
// ToBytes marshal IntReply
func (i *IntReply) ToBytes() []byte {
  return []byte(":" + strconv.FormatInt(i.Num, 10) + CRLF)
}
/* ---- Bulk Reply ---- */
// BulkReply Binary security string,For example, the return value of commands such as [get]
type BulkReply struct {
  Arg []byte
}
// MakeBulkReply create a Bulk String
func MakeBulkReply(arg []byte) *BulkReply {
  return &BulkReply{Arg: arg}
}
// ToBytes marshal BulkReply
func (b *BulkReply) ToBytes() []byte {
  if len(b.Arg) == 0 {
    return nullBulkReplyBytes
  }
  return []byte("$" + strconv.Itoa(len(b.Arg)) + CRLF + string(b.Arg) + CRLF)
}
/* ---- Multi Bulk Reply ---- */
// MultiBulkReply is Bulk string array, the format of commands sent by the client and command responses such as [keys *]
type MultiBulkReply struct {
  Args [][]byte
}
// MakeMultiBulkReply create a Bulk string array
func MakeMultiBulkReply(args [][]byte) *MultiBulkReply {
  return &MultiBulkReply{Args: args}
}
// ToBytes marshal MultiBulkReply
func (m *MultiBulkReply) ToBytes() []byte {
  var buf bytes.Buffer
  buf.WriteString("*" + strconv.Itoa(len(m.Args)) + CRLF)
  for _, arg := range m.Args {
    if arg == nil {
      buf.WriteString("$-1" + CRLF)
    } else {
      buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF)
    }
  }
  return buf.Bytes()
}

perser_test.go

package RESP
import (
  "bytes"
  "fmt"
  "io"
  "testing"
)
func TestParseStream(t *testing.T) {
  replies := []Reply{
    MakeStatusReply("OK"),
    MakeIntReply(1),
    MakeErrorReply("ERR unknown"),
    MakeBulkReply([]byte("a\r\nb")), // test binary safe
    MakeMultiBulkReply([][]byte{
      []byte("set"),
      []byte("a\r\nb"),
      []byte("ok"),
    }),
  }
  reqs := bytes.Buffer{}
  for _, re := range replies {
    reqs.Write(re.ToBytes())
  }
  reqs.Write([]byte("set wxf 6872wxf" + CRLF))
  ch := ParseStream(bytes.NewReader(reqs.Bytes()))
  for DataCache := range ch {
    if DataCache.Err != nil {
      if DataCache.Err == io.EOF {
        fmt.Println("解析完成")
      }
      fmt.Println(DataCache.Err)
    } else {
      fmt.Println(string(DataCache.Data.ToBytes()))
    }
  }
}

custom.go

package RESP
// PongReply +PONG
type PongReply struct{}
// ToBytes marshal Reply
func (r *PongReply) ToBytes() []byte {
  return []byte("+PONG\r\n")
}
func MakePongReply() *PongReply {
  return &PongReply{}
}
// OkReply is +OK
type OkReply struct{}
// ToBytes marshal Reply
func (r *OkReply) ToBytes() []byte {
  return []byte("+OK\r\n")
}
func MakeOkReply() *OkReply {
  return &OkReply{}
}
// NullBulkReply is empty string
type NullBulkReply struct{}
// ToBytes marshal redis.Reply
func (r *NullBulkReply) ToBytes() []byte {
  return []byte("$-1\r\n")
}
// MakeNullBulkReply creates a new NullBulkReply
func MakeNullBulkReply() *NullBulkReply {
  return &NullBulkReply{}
}
// EmptyMultiBulkReply is a empty list
type EmptyMultiBulkReply struct{}
// ToBytes marshal redis.Reply
func (r *EmptyMultiBulkReply) ToBytes() []byte {
  return []byte("*0\r\n")
}
// MakeEmptyMultiBulkReply creates EmptyMultiBulkReply
func MakeEmptyMultiBulkReply() *EmptyMultiBulkReply {
  return &EmptyMultiBulkReply{}
}
// NoReply respond nothing, for commands like subscribe
type NoReply struct{}
var noBytes = []byte("")
// ToBytes marshal redis.Reply
func (r *NoReply) ToBytes() []byte {
  return noBytes
}
// QueuedReply is +QUEUED
type QueuedReply struct{}
// ToBytes marshal redis.Reply
func (r *QueuedReply) ToBytes() []byte {
  return []byte("+QUEUED\r\n")
}
// MakeQueuedReply returns a QUEUED reply
func MakeQueuedReply() *QueuedReply {
  return &QueuedReply{}
}


相关实践学习
基于Redis实现在线游戏积分排行榜
本场景将介绍如何基于Redis数据库实现在线游戏中的游戏玩家积分排行榜功能。
云数据库 Redis 版使用教程
云数据库Redis版是兼容Redis协议标准的、提供持久化的内存数据库服务,基于高可靠双机热备架构及可无缝扩展的集群架构,满足高读写性能场景及容量需弹性变配的业务需求。 产品详情:https://www.aliyun.com/product/kvstore &nbsp; &nbsp; ------------------------------------------------------------------------- 阿里云数据库体验:数据库上云实战 开发者云会免费提供一台带自建MySQL的源数据库&nbsp;ECS 实例和一台目标数据库&nbsp;RDS实例。跟着指引,您可以一步步实现将ECS自建数据库迁移到目标数据库RDS。 点击下方链接,领取免费ECS&amp;RDS资源,30分钟完成数据库上云实战!https://developer.aliyun.com/adc/scenario/51eefbd1894e42f6bb9acacadd3f9121?spm=a2c6h.13788135.J_3257954370.9.4ba85f24utseFl
目录
相关文章
|
6月前
|
NoSQL Redis
Redis原理之网络通信协议笔记
1. RESP协议 ​2. 自定义Socket连接Redis
|
6月前
|
物联网 Go 网络性能优化
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式
使用Go语言(Golang)可以实现MQTT协议的点对点(P2P)消息发送。MQTT协议本身支持多种消息收发模式【1月更文挑战第21天】【1月更文挑战第104篇】
491 1
|
1月前
|
设计模式 NoSQL 网络协议
大数据-48 Redis 通信协议原理RESP 事件处理机制原理 文件事件 时间事件 Reactor多路复用
大数据-48 Redis 通信协议原理RESP 事件处理机制原理 文件事件 时间事件 Reactor多路复用
37 2
|
6月前
|
NoSQL 测试技术 Go
【Golang】国密SM2公钥私钥序列化到redis中并加密解密实战_sm2反编(1)
【Golang】国密SM2公钥私钥序列化到redis中并加密解密实战_sm2反编(1)
|
2月前
|
Go
Golang语言结构体(struct)面向对象编程进阶篇(封装,继承和多态)
这篇文章是关于Go语言中结构体(struct)面向对象编程进阶篇的教程,涵盖了Go语言如何实现封装、继承和多态,以及结构体内存布局的相关概念和案例。
152 4
|
3月前
|
NoSQL Redis 数据安全/隐私保护
[redis]定制封装redis的docker镜像
[redis]定制封装redis的docker镜像
|
4月前
|
Go 开发者
golang的http客户端封装
golang的http客户端封装
71 0
|
5月前
|
缓存 负载均衡 NoSQL
Redis系列学习文章分享---第十四篇(Redis多级缓存--封装Http请求+向tomcat发送http请求+根据商品id对tomcat集群负载均衡)
Redis系列学习文章分享---第十四篇(Redis多级缓存--封装Http请求+向tomcat发送http请求+根据商品id对tomcat集群负载均衡)
82 1
|
5月前
|
缓存 NoSQL Java
Redis系列学习文章分享---第四篇(Redis快速入门之Java客户端--商户查询缓存+更新+双写一致+穿透+雪崩+击穿+工具封装)
Redis系列学习文章分享---第四篇(Redis快速入门之Java客户端--商户查询缓存+更新+双写一致+穿透+雪崩+击穿+工具封装)
71 0
|
6月前
|
缓存 NoSQL Java
springboot中集成redis,二次封装成工具类
springboot中集成redis,二次封装成工具类
下一篇
无影云桌面