RESP
具体RESP协议不再展开,网上很多,这里给出核心代码与流程简图
/* 简单字符串:以"+" 开始【+OK\r\n】 (Simple String) 错误: 以"-" 开始【-ERR Invalid Syntax\r\n】 (Error) 整数: 以":" 开始【:1\r\n】 (Int) 字符串: 以 $ 开始【$3\r\nSET\r\n】(Bulk String) 数组: 以 * 开始【*3\r\n$3\r\nSET\r\n$3\r\nwxf\r\n$5\r\n123\r\n】(Multi Bulk Strings) */
RESP是redis客户端与服务端进行通信的协议,所以在这里要对RESP进行封装,将一次命令封装成为一个reply,然后传递给channel,调用者通过读取channel就可以获得一次完整的命令
下面是流程图
代码
perser.go(核心)
package RESP import ( "bufio" "bytes" "errors" "io" "strconv" "strings" ) // DataCache stores Reply or error type DataCache struct { Data Reply Err error } // ParseStream reads data from io.Reader and send payloads through channel func ParseStream(reader io.Reader) <-chan *DataCache { DataChan := make(chan *DataCache) go parse0(reader, DataChan) return DataChan } // ParseBytes reads data from []byte and return all replies func ParseBytes(data []byte) ([]Reply, error) { DataChan := make(chan *DataCache) reader := bytes.NewReader(data) go parse0(reader, DataChan) var results []Reply for DataCache := range DataChan { if DataCache == nil { return nil, errors.New("no reply") } if DataCache.Err != nil { if DataCache.Err == io.EOF { break } else { return nil, DataCache.Err } } results = append(results, DataCache.Data) } return results, nil } // ParseOne reads data from []byte and return the first DataCache func ParseOne(data []byte) (Reply, error) { DataChan := make(chan *DataCache) reader := bytes.NewReader(data) go parse0(reader, DataChan) DataCache := <-DataChan if DataCache == nil { return nil, errors.New("no reply") } return DataCache.Data, DataCache.Err } type readState struct { readingMultiLine bool //Need to read multiple lines expectedArgsCount int //want to read args msgType byte //type args [][]byte bulkLen int } func (r *readState) finished() bool { return r.expectedArgsCount > 0 && len(r.args) == r.expectedArgsCount } func parse0(reader io.Reader, ch chan *DataCache) { bufReader := bufio.NewReader(reader) var state readState for { // read one line msg, err := readLine(bufReader, &state) //Read a row including \r\n if err != nil { if err == io.EOF { //encounter io err, stop read ch <- &DataCache{Err: err} close(ch) return } ch <- &DataCache{Err: err} state = readState{} continue } // parse line if state.readingMultiLine == false { //The first time a command enters, it will be false switch msg[0] { case '*': //multi bulk reply err := parseMultiBulkHeader(msg, &state) if err != nil { ch <- &DataCache{Err: err} state = readState{} // reset state continue } if state.expectedArgsCount == 0 { ch <- &DataCache{ Data: MakeEmptyMultiBulkReply(), } state = readState{} // reset state continue } case '$': // bulk reply err = parseBulkHeader(msg, &state) if err != nil { ch <- &DataCache{ Err: err, } state = readState{} // reset state continue } if state.bulkLen == -1 { // null bulk reply ch <- &DataCache{ Data: MakeNullBulkReply(), } state = readState{} // reset state continue } default: //single line reply result, err := parseSingleLineReply(msg) ch <- &DataCache{ Data: result, Err: err, } state = readState{} // reset state continue } } else if state.readingMultiLine == true { err = readBody(msg, &state) if err != nil { ch <- &DataCache{Err: err} state = readState{} // reset state continue } if state.finished() { var result Reply if state.msgType == '*' { result = MakeMultiBulkReply(state.args) } else if state.msgType == '$' { result = MakeBulkReply(state.args[0]) } ch <- &DataCache{ Data: result, Err: err, } state = readState{} } } } } // readLine Read a row including \r\n func readLine(bufReader *bufio.Reader, state *readState) ([]byte, error) { var msg []byte var err error if state.bulkLen == 0 { //Read an instruction for the first time msg, err = bufReader.ReadBytes('\n') if err != nil { err = io.EOF return nil, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, errors.New("protocol error: " + string(msg)) } } else { //Read an instruction for the nth time msg = make([]byte, state.bulkLen+2) _, err := io.ReadFull(bufReader, msg) if err != nil { err = io.EOF return nil, err } if len(msg) == 0 || msg[len(msg)-2] != '\r' { return nil, errors.New("protocol error: " + string(msg)) } state.bulkLen = 0 } return msg, nil } // parseMultiBulkHeader parse Multi Bulk Header func parseMultiBulkHeader(msg []byte, state *readState) error { expectedLine, err := strconv.Atoi(string(msg[1 : len(msg)-2])) if err != nil { return errors.New("protocol error: " + string(msg)) } if expectedLine == 0 { state.expectedArgsCount = 0 return nil } else { // first line of multi bulk reply state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = expectedLine state.args = make([][]byte, 0, expectedLine) return nil } } func parseBulkHeader(msg []byte, state *readState) error { var err error state.bulkLen, err = strconv.Atoi(string(msg[1 : len(msg)-2])) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen == -1 { return nil } else { state.msgType = msg[0] state.readingMultiLine = true state.expectedArgsCount = 1 state.args = make([][]byte, 0, 1) return nil } } func parseSingleLineReply(msg []byte) (Reply, error) { var result Reply str := strings.TrimSuffix(string(msg), "\r\n") switch msg[0] { case '+': // status reply result = MakeStatusReply(str[1:]) case '-': // err reply result = MakeErrorReply(str[1:]) case ':': val, _ := strconv.ParseInt(str[1:], 10, 64) result = MakeIntReply(val) default: //Information entered strs := strings.Split(str, " ") args := make([][]byte, len(strs)) for i, s := range strs { args[i] = []byte(s) } result = MakeMultiBulkReply(args) } return result, nil } // read the non-first lines of multi bulk reply or bulk reply func readBody(msg []byte, state *readState) error { line := msg[:len(msg)-2] if line[0] == '$' { // bulk reply var err error state.bulkLen, err = strconv.Atoi(string(line[1:])) if err != nil { return errors.New("protocol error: " + string(msg)) } if state.bulkLen <= 0 { // null bulk in multi bulks state.args = append(state.args, []byte{}) state.bulkLen = 0 } } else { state.args = append(state.args, line) } return nil }
reply.go
package RESP import ( "bytes" "strconv" ) type Reply interface { ToBytes() []byte } var CRLF = "\r\n" var nullBulkReplyBytes = []byte("$-1\r\n") /* 简单字符串:以"+" 开始【+OK\r\n】 (Simple String) 错误: 以"-" 开始【-ERR Invalid Syntax\r\n】 (Error) 整数: 以":" 开始【:1\r\n】 (Int) 字符串: 以 $ 开始【$3\r\nSET\r\n】(Bulk String) 数组: 以 * 开始【*3\r\n$3\r\nSET\r\n$3\r\nwxf\r\n$5\r\n123\r\n】(Multi Bulk Strings) */ /* ---- Status Reply ---- */ // StatusReply indicates the status,The server is used to return simple results type StatusReply struct { Status string } // MakeStatusReply create a StatusReply func MakeStatusReply(status string) *StatusReply { return &StatusReply{Status: status} } // ToBytes marshal StatusReply func (s *StatusReply) ToBytes() []byte { return []byte("+" + s.Status + CRLF) } /* ---- Error Reply ---- */ // ErrorReply The server is used to return simple error messages type ErrorReply struct { Err string } // MakeErrorReply create a ErrorReply func MakeErrorReply(err string) *ErrorReply { return &ErrorReply{Err: err} } // ToBytes marshal ErrorReply func (e *ErrorReply) ToBytes() []byte { return []byte("-" + e.Err + CRLF) } /* ---- Int Reply ---- */ // IntReply Is the return value of the command such as [strlen key],int64 type. type IntReply struct { Num int64 } // MakeIntReply create a IntReply func MakeIntReply(num int64) *IntReply { return &IntReply{Num: num} } // ToBytes marshal IntReply func (i *IntReply) ToBytes() []byte { return []byte(":" + strconv.FormatInt(i.Num, 10) + CRLF) } /* ---- Bulk Reply ---- */ // BulkReply Binary security string,For example, the return value of commands such as [get] type BulkReply struct { Arg []byte } // MakeBulkReply create a Bulk String func MakeBulkReply(arg []byte) *BulkReply { return &BulkReply{Arg: arg} } // ToBytes marshal BulkReply func (b *BulkReply) ToBytes() []byte { if len(b.Arg) == 0 { return nullBulkReplyBytes } return []byte("$" + strconv.Itoa(len(b.Arg)) + CRLF + string(b.Arg) + CRLF) } /* ---- Multi Bulk Reply ---- */ // MultiBulkReply is Bulk string array, the format of commands sent by the client and command responses such as [keys *] type MultiBulkReply struct { Args [][]byte } // MakeMultiBulkReply create a Bulk string array func MakeMultiBulkReply(args [][]byte) *MultiBulkReply { return &MultiBulkReply{Args: args} } // ToBytes marshal MultiBulkReply func (m *MultiBulkReply) ToBytes() []byte { var buf bytes.Buffer buf.WriteString("*" + strconv.Itoa(len(m.Args)) + CRLF) for _, arg := range m.Args { if arg == nil { buf.WriteString("$-1" + CRLF) } else { buf.WriteString("$" + strconv.Itoa(len(arg)) + CRLF + string(arg) + CRLF) } } return buf.Bytes() }
perser_test.go
package RESP import ( "bytes" "fmt" "io" "testing" ) func TestParseStream(t *testing.T) { replies := []Reply{ MakeStatusReply("OK"), MakeIntReply(1), MakeErrorReply("ERR unknown"), MakeBulkReply([]byte("a\r\nb")), // test binary safe MakeMultiBulkReply([][]byte{ []byte("set"), []byte("a\r\nb"), []byte("ok"), }), } reqs := bytes.Buffer{} for _, re := range replies { reqs.Write(re.ToBytes()) } reqs.Write([]byte("set wxf 6872wxf" + CRLF)) ch := ParseStream(bytes.NewReader(reqs.Bytes())) for DataCache := range ch { if DataCache.Err != nil { if DataCache.Err == io.EOF { fmt.Println("解析完成") } fmt.Println(DataCache.Err) } else { fmt.Println(string(DataCache.Data.ToBytes())) } } }
custom.go
package RESP // PongReply +PONG type PongReply struct{} // ToBytes marshal Reply func (r *PongReply) ToBytes() []byte { return []byte("+PONG\r\n") } func MakePongReply() *PongReply { return &PongReply{} } // OkReply is +OK type OkReply struct{} // ToBytes marshal Reply func (r *OkReply) ToBytes() []byte { return []byte("+OK\r\n") } func MakeOkReply() *OkReply { return &OkReply{} } // NullBulkReply is empty string type NullBulkReply struct{} // ToBytes marshal redis.Reply func (r *NullBulkReply) ToBytes() []byte { return []byte("$-1\r\n") } // MakeNullBulkReply creates a new NullBulkReply func MakeNullBulkReply() *NullBulkReply { return &NullBulkReply{} } // EmptyMultiBulkReply is a empty list type EmptyMultiBulkReply struct{} // ToBytes marshal redis.Reply func (r *EmptyMultiBulkReply) ToBytes() []byte { return []byte("*0\r\n") } // MakeEmptyMultiBulkReply creates EmptyMultiBulkReply func MakeEmptyMultiBulkReply() *EmptyMultiBulkReply { return &EmptyMultiBulkReply{} } // NoReply respond nothing, for commands like subscribe type NoReply struct{} var noBytes = []byte("") // ToBytes marshal redis.Reply func (r *NoReply) ToBytes() []byte { return noBytes } // QueuedReply is +QUEUED type QueuedReply struct{} // ToBytes marshal redis.Reply func (r *QueuedReply) ToBytes() []byte { return []byte("+QUEUED\r\n") } // MakeQueuedReply returns a QUEUED reply func MakeQueuedReply() *QueuedReply { return &QueuedReply{} }