基于RESP编写高性能解析器
因为JDK
原生提供的字节缓冲区java.nio.ByteBuffer
存在不能自动扩容、需要切换读写模式等等问题,这里直接引入Netty
并且使用Netty
提供的ByteBuf
进行RESP
数据类型解析。编写本文的时候(2019-10-09)Netty
的最新版本为4.1.42.Final
。引入依赖:
<dependency> <groupId>io.netty</groupId> <artifactId>netty-buffer</artifactId> <version>4.1.42.Final</version> </dependency> 复制代码
定义解码器接口:
public interface RespDecoder<V>{ V decode(ByteBuf buffer); } 复制代码
定义常量:
public class RespConstants { public static final Charset ASCII = StandardCharsets.US_ASCII; public static final Charset UTF_8 = StandardCharsets.UTF_8; public static final byte DOLLAR_BYTE = '$'; public static final byte ASTERISK_BYTE = '*'; public static final byte PLUS_BYTE = '+'; public static final byte MINUS_BYTE = '-'; public static final byte COLON_BYTE = ':'; public static final String EMPTY_STRING = ""; public static final Long ZERO = 0L; public static final Long NEGATIVE_ONE = -1L; public static final byte CR = (byte) '\r'; public static final byte LF = (byte) '\n'; public static final byte[] CRLF = "\r\n".getBytes(ASCII); public enum ReplyType { SIMPLE_STRING, ERROR, INTEGER, BULK_STRING, RESP_ARRAY } } 复制代码
下面的章节中解析模块的实现已经忽略第一个字节的解析,因为第一个字节是决定具体的数据类型。
解析简单字符串
简单字符串类型就是单行字符串,它的解析结果对应的就是Java
中的String
类型。解码器实现如下:
// 解析单行字符串 public class LineStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { return CodecUtils.X.readLine(buffer); } } public enum CodecUtils { X; public int findLineEndIndex(ByteBuf buffer) { int index = buffer.forEachByte(ByteProcessor.FIND_LF); return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1; } public String readLine(ByteBuf buffer) { int lineEndIndex = findLineEndIndex(buffer); if (lineEndIndex > -1) { int lineStartIndex = buffer.readerIndex(); // 计算字节长度 int size = lineEndIndex - lineStartIndex - 1; byte[] bytes = new byte[size]; buffer.readBytes(bytes); // 重置读游标为\r\n之后的第一个字节 buffer.readerIndex(lineEndIndex + 1); buffer.markReaderIndex(); return new String(bytes, RespConstants.UTF_8); } return null; } } public class RespSimpleStringDecoder extends LineStringDecoder { } 复制代码
这里抽取出一个类LineStringDecoder
用于解析单行字符串,这样在解析错误消息的时候可以做一次继承即可。测试一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // +OK\r\n buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); String value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:OK 复制代码
解析错误消息
错误消息的本质也是单行字符串,所以其解码的实现可以和简单字符串的解码实现一致。错误消息数据类型的解码器如下:
public class RespErrorDecoder extends LineStringDecoder { } 复制代码
测试一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // -ERR unknown command 'foobar'\r\n buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); String value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:ERR unknown command 'foobar' 复制代码
解析整型数字
整型数字类型,本质就是需要从字节序列中还原出带符号的64bit
的长整型,因为是带符号的,类型标识位:
后的第一个字节需要判断是否负数字符-
,因为是从左向右解析,然后每解析出一个新的位,当前的数字值要乘10
。其解码器的实现如下:
public class RespIntegerDecoder implements RespDecoder<Long> { @Override public Long decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); // 没有行尾,异常 if (-1 == lineEndIndex) { return null; } long result = 0L; int lineStartIndex = buffer.readerIndex(); boolean negative = false; byte firstByte = buffer.getByte(lineStartIndex); // 负数 if (RespConstants.MINUS_BYTE == firstByte) { negative = true; } else { int digit = firstByte - '0'; result = result * 10 + digit; } for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) { byte value = buffer.getByte(i); int digit = value - '0'; result = result * 10 + digit; } if (negative) { result = -result; } // 重置读游标为\r\n之后的第一个字节 buffer.readerIndex(lineEndIndex + 1); return result; } } 复制代码
整型数字类型的解析相对复杂,一定要注意负数判断。测试一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // :-1000\r\n buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); Long value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:-1000 复制代码
解析定长字符串
定长字符串类型解析的关键是先读取类型标识符$
后的第一个字节序列分块解析成64bit
带符号的整数,用来确定后面需要解析的字符串内容的字节长度,然后再按照该长度读取后面的字节。其解码器实现如下:
public class RespBulkStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } // 使用RespIntegerDecoder读取长度 Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Bulk Null String if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Bulk Empty String if (RespConstants.ZERO.equals(length)) { return RespConstants.EMPTY_STRING; } // 真实字节内容的长度 int readLength = (int) length.longValue(); if (buffer.readableBytes() > readLength) { byte[] bytes = new byte[readLength]; buffer.readBytes(bytes); // 重置读游标为\r\n之后的第一个字节 buffer.readerIndex(buffer.readerIndex() + 2); return new String(bytes, RespConstants.UTF_8); } return null; } } 复制代码
测试一下:
public static void main(String[] args) throws Exception{ ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); // $6\r\nthrowable\r\n buffer = ByteBufAllocator.DEFAULT.buffer(); buffer.writeBytes("$9".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); String value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:throwable 复制代码
解析RESP数组
RESP
数组类型解析的关键:
- 先读取类型标识符
*
后的第一个字节序列分块解析成64bit
带符号的整数,确定数组中的元素个数。 - 递归解析每个元素。
参考过不少Redis
协议解析框架,不少是用栈或者状态机实现,这里先简单点用递归实现,解码器代码如下:
public class RespArrayDecoder implements RespDecoder { @Override public Object decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } // 解析元素个数 Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Null Array if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Array Empty List if (RespConstants.ZERO.equals(length)) { return Lists.newArrayList(); } List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue()); // 递归 for (int i = 0; i < length; i++) { result.add(DefaultRespCodec.X.decode(buffer)); } return result; } } 复制代码
测试一下:
public static void main(String[] args) throws Exception { ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer(); //*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n buffer = ByteBufAllocator.DEFAULT.buffer(); buffer.writeBytes("*2".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("$3".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("foo".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("$3".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); buffer.writeBytes("bar".getBytes(RespConstants.UTF_8)); buffer.writeBytes(RespConstants.CRLF); List value = RespCodec.X.decode(buffer); log.info("Decode result:{}", value); } // Decode result:[foo, bar] 复制代码
小结
对RESP
的内容和其编码解码的过程有相对深刻的认识后,就可以基于Netty
编写Redis
服务的编码解码模块,作为Netty
入门的十分有意义的例子。本文的最后一节只演示了RESP
的解码部分,编码模块和更多细节会在另一篇用Netty
实现Redis
客户端的文章中展示。
参考资料:
链接
希望你能读到这里,然后发现我:
- Github Page:www.throwable.club/2019/10/09/…
- Coding Page:throwable.coding.me/2019/10/09/…
附录
本文涉及的所有代码:
public class RespConstants { public static final Charset ASCII = StandardCharsets.US_ASCII; public static final Charset UTF_8 = StandardCharsets.UTF_8; public static final byte DOLLAR_BYTE = '$'; public static final byte ASTERISK_BYTE = '*'; public static final byte PLUS_BYTE = '+'; public static final byte MINUS_BYTE = '-'; public static final byte COLON_BYTE = ':'; public static final String EMPTY_STRING = ""; public static final Long ZERO = 0L; public static final Long NEGATIVE_ONE = -1L; public static final byte CR = (byte) '\r'; public static final byte LF = (byte) '\n'; public static final byte[] CRLF = "\r\n".getBytes(ASCII); public enum ReplyType { SIMPLE_STRING, ERROR, INTEGER, BULK_STRING, RESP_ARRAY } } public enum CodecUtils { X; public int findLineEndIndex(ByteBuf buffer) { int index = buffer.forEachByte(ByteProcessor.FIND_LF); return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1; } public String readLine(ByteBuf buffer) { int lineEndIndex = findLineEndIndex(buffer); if (lineEndIndex > -1) { int lineStartIndex = buffer.readerIndex(); // 计算字节长度 int size = lineEndIndex - lineStartIndex - 1; byte[] bytes = new byte[size]; buffer.readBytes(bytes); // 重置读游标为\r\n之后的第一个字节 buffer.readerIndex(lineEndIndex + 1); buffer.markReaderIndex(); return new String(bytes, RespConstants.UTF_8); } return null; } } public interface RespCodec { RespCodec X = DefaultRespCodec.X; <IN, OUT> OUT decode(ByteBuf buffer); <IN, OUT> ByteBuf encode(IN in); } public enum DefaultRespCodec implements RespCodec { X; static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap(); private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder(); static { DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder()); DECODERS.put(ReplyType.ERROR, new RespErrorDecoder()); DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder()); DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder()); DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder()); } @SuppressWarnings("unchecked") @Override public <IN, OUT> OUT decode(ByteBuf buffer) { return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer); } private ReplyType determineReplyType(ByteBuf buffer) { byte firstByte = buffer.readByte(); ReplyType replyType; switch (firstByte) { case RespConstants.PLUS_BYTE: replyType = ReplyType.SIMPLE_STRING; break; case RespConstants.MINUS_BYTE: replyType = ReplyType.ERROR; break; case RespConstants.COLON_BYTE: replyType = ReplyType.INTEGER; break; case RespConstants.DOLLAR_BYTE: replyType = ReplyType.BULK_STRING; break; case RespConstants.ASTERISK_BYTE: replyType = ReplyType.RESP_ARRAY; break; default: { throw new IllegalArgumentException("first byte:" + firstByte); } } return replyType; } @Override public <IN, OUT> ByteBuf encode(IN in) { // TODO throw new UnsupportedOperationException("encode"); } } public interface RespDecoder<V> { V decode(ByteBuf buffer); } public class DefaultRespDecoder implements RespDecoder { @Override public Object decode(ByteBuf buffer) { throw new IllegalStateException("decoder"); } } public class LineStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { return CodecUtils.X.readLine(buffer); } } public class RespSimpleStringDecoder extends LineStringDecoder { } public class RespErrorDecoder extends LineStringDecoder { } public class RespIntegerDecoder implements RespDecoder<Long> { @Override public Long decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); // 没有行尾,异常 if (-1 == lineEndIndex) { return null; } long result = 0L; int lineStartIndex = buffer.readerIndex(); boolean negative = false; byte firstByte = buffer.getByte(lineStartIndex); // 负数 if (RespConstants.MINUS_BYTE == firstByte) { negative = true; } else { int digit = firstByte - '0'; result = result * 10 + digit; } for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) { byte value = buffer.getByte(i); int digit = value - '0'; result = result * 10 + digit; } if (negative) { result = -result; } // 重置读游标为\r\n之后的第一个字节 buffer.readerIndex(lineEndIndex + 1); return result; } } public class RespBulkStringDecoder implements RespDecoder<String> { @Override public String decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Bulk Null String if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Bulk Empty String if (RespConstants.ZERO.equals(length)) { return RespConstants.EMPTY_STRING; } // 真实字节内容的长度 int readLength = (int) length.longValue(); if (buffer.readableBytes() > readLength) { byte[] bytes = new byte[readLength]; buffer.readBytes(bytes); // 重置读游标为\r\n之后的第一个字节 buffer.readerIndex(buffer.readerIndex() + 2); return new String(bytes, RespConstants.UTF_8); } return null; } } public class RespArrayDecoder implements RespDecoder { @Override public Object decode(ByteBuf buffer) { int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer); if (-1 == lineEndIndex) { return null; } // 解析元素个数 Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer); if (null == length) { return null; } // Null Array if (RespConstants.NEGATIVE_ONE.equals(length)) { return null; } // Array Empty List if (RespConstants.ZERO.equals(length)) { return Lists.newArrayList(); } List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue()); // 递归 for (int i = 0; i < length; i++) { result.add(DefaultRespCodec.X.decode(buffer)); } return result; } }