一文彻底理解Redis序列化协议,你也可以编写Redis客户端(下)

本文涉及的产品
Redis 开源版,标准版 2GB
推荐场景:
搭建游戏排行榜
云解析DNS-重点域名监控,免费拨测 20万次(价值200元)
简介: 最近学习Netty的时候想做一个基于Redis服务协议的编码解码模块,过程中顺便阅读了Redis服务序列化协议RESP,结合自己的理解对文档进行了翻译并且简单实现了RESP基于Java语言的解析。编写本文的使用使用的JDK版本为[8+]。

基于RESP编写高性能解析器



因为JDK原生提供的字节缓冲区java.nio.ByteBuffer存在不能自动扩容、需要切换读写模式等等问题,这里直接引入Netty并且使用Netty提供的ByteBuf进行RESP数据类型解析。编写本文的时候(2019-10-09)Netty的最新版本为4.1.42.Final。引入依赖:


<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-buffer</artifactId>
    <version>4.1.42.Final</version>
</dependency>
复制代码


定义解码器接口:


public interface RespDecoder<V>{
    V decode(ByteBuf buffer);
}
复制代码


定义常量:


public class RespConstants {
    public static final Charset ASCII = StandardCharsets.US_ASCII;
    public static final Charset UTF_8 = StandardCharsets.UTF_8;
    public static final byte DOLLAR_BYTE = '$';
    public static final byte ASTERISK_BYTE = '*';
    public static final byte PLUS_BYTE = '+';
    public static final byte MINUS_BYTE = '-';
    public static final byte COLON_BYTE = ':';
    public static final String EMPTY_STRING = "";
    public static final Long ZERO = 0L;
    public static final Long NEGATIVE_ONE = -1L;
    public static final byte CR = (byte) '\r';
    public static final byte LF = (byte) '\n';
    public static final byte[] CRLF = "\r\n".getBytes(ASCII);
    public enum ReplyType {
        SIMPLE_STRING,
        ERROR,
        INTEGER,
        BULK_STRING,
        RESP_ARRAY
    }
}
复制代码


下面的章节中解析模块的实现已经忽略第一个字节的解析,因为第一个字节是决定具体的数据类型。


解析简单字符串


简单字符串类型就是单行字符串,它的解析结果对应的就是Java中的String类型。解码器实现如下:


// 解析单行字符串
public class LineStringDecoder implements RespDecoder<String> {
    @Override
    public String decode(ByteBuf buffer) {
        return CodecUtils.X.readLine(buffer);
    }
}
public enum CodecUtils {
    X;
    public int findLineEndIndex(ByteBuf buffer) {
        int index = buffer.forEachByte(ByteProcessor.FIND_LF);
        return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
    }
    public String readLine(ByteBuf buffer) {
        int lineEndIndex = findLineEndIndex(buffer);
        if (lineEndIndex > -1) {
            int lineStartIndex = buffer.readerIndex();
            // 计算字节长度
            int size = lineEndIndex - lineStartIndex - 1;
            byte[] bytes = new byte[size];
            buffer.readBytes(bytes);
            // 重置读游标为\r\n之后的第一个字节
            buffer.readerIndex(lineEndIndex + 1);
            buffer.markReaderIndex();
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
复制代码


这里抽取出一个类LineStringDecoder用于解析单行字符串,这样在解析错误消息的时候可以做一次继承即可。测试一下:


public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // +OK\r\n
    buffer.writeBytes("+OK".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:OK
复制代码


解析错误消息


错误消息的本质也是单行字符串,所以其解码的实现可以和简单字符串的解码实现一致。错误消息数据类型的解码器如下:


public class RespErrorDecoder extends LineStringDecoder {
}
复制代码


测试一下:


public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // -ERR unknown command 'foobar'\r\n
    buffer.writeBytes("-ERR unknown command 'foobar'".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:ERR unknown command 'foobar'
复制代码


解析整型数字


整型数字类型,本质就是需要从字节序列中还原出带符号的64bit的长整型,因为是带符号的,类型标识位:后的第一个字节需要判断是否负数字符-,因为是从左向右解析,然后每解析出一个新的位,当前的数字值要乘10。其解码器的实现如下:


public class RespIntegerDecoder implements RespDecoder<Long> {
    @Override
    public Long decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        // 没有行尾,异常
        if (-1 == lineEndIndex) {
            return null;
        }
        long result = 0L;
        int lineStartIndex = buffer.readerIndex();
        boolean negative = false;
        byte firstByte = buffer.getByte(lineStartIndex);
        // 负数
        if (RespConstants.MINUS_BYTE == firstByte) {
            negative = true;
        } else {
            int digit = firstByte - '0';
            result = result * 10 + digit;
        }
        for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
            byte value = buffer.getByte(i);
            int digit = value - '0';
            result = result * 10 + digit;
        }
        if (negative) {
            result = -result;
        }
        // 重置读游标为\r\n之后的第一个字节
        buffer.readerIndex(lineEndIndex + 1);
        return result;
    }
}
复制代码


整型数字类型的解析相对复杂,一定要注意负数判断。测试一下:


public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // :-1000\r\n
    buffer.writeBytes(":-1000".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    Long value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:-1000
复制代码


解析定长字符串


定长字符串类型解析的关键是先读取类型标识符$后的第一个字节序列分块解析成64bit带符号的整数,用来确定后面需要解析的字符串内容的字节长度,然后再按照该长度读取后面的字节。其解码器实现如下:


public class RespBulkStringDecoder implements RespDecoder<String> {
    @Override
    public String decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 使用RespIntegerDecoder读取长度
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Bulk Null String
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Bulk Empty String
        if (RespConstants.ZERO.equals(length)) {
            return RespConstants.EMPTY_STRING;
        }
        // 真实字节内容的长度
        int readLength = (int) length.longValue();
        if (buffer.readableBytes() > readLength) {
            byte[] bytes = new byte[readLength];
            buffer.readBytes(bytes);
            // 重置读游标为\r\n之后的第一个字节
            buffer.readerIndex(buffer.readerIndex() + 2);
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}
复制代码


测试一下:


public static void main(String[] args) throws Exception{
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    // $6\r\nthrowable\r\n
    buffer = ByteBufAllocator.DEFAULT.buffer();
    buffer.writeBytes("$9".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("throwable".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    String value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:throwable
复制代码


解析RESP数组


RESP数组类型解析的关键:


  • 先读取类型标识符*后的第一个字节序列分块解析成64bit带符号的整数,确定数组中的元素个数。
  • 递归解析每个元素。


参考过不少Redis协议解析框架,不少是用栈或者状态机实现,这里先简单点用递归实现,解码器代码如下:


public class RespArrayDecoder implements RespDecoder {
    @Override
    public Object decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 解析元素个数
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Null Array
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Array Empty List
        if (RespConstants.ZERO.equals(length)) {
            return Lists.newArrayList();
        }
        List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
        // 递归
        for (int i = 0; i < length; i++) {
            result.add(DefaultRespCodec.X.decode(buffer));
        }
        return result;
    }
}
复制代码


测试一下:


public static void main(String[] args) throws Exception {
    ByteBuf buffer = ByteBufAllocator.DEFAULT.buffer();
    //*2\r\n$3\r\nfoo\r\n$3\r\nbar\r\n
    buffer = ByteBufAllocator.DEFAULT.buffer();
    buffer.writeBytes("*2".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("foo".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("$3".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    buffer.writeBytes("bar".getBytes(RespConstants.UTF_8));
    buffer.writeBytes(RespConstants.CRLF);
    List value = RespCodec.X.decode(buffer);
    log.info("Decode result:{}", value);
}
// Decode result:[foo, bar]
复制代码


小结



RESP的内容和其编码解码的过程有相对深刻的认识后,就可以基于Netty编写Redis服务的编码解码模块,作为Netty入门的十分有意义的例子。本文的最后一节只演示了RESP的解码部分,编码模块和更多细节会在另一篇用Netty实现Redis客户端的文章中展示。


参考资料:


链接



希望你能读到这里,然后发现我:


附录



本文涉及的所有代码:


public class RespConstants {
    public static final Charset ASCII = StandardCharsets.US_ASCII;
    public static final Charset UTF_8 = StandardCharsets.UTF_8;
    public static final byte DOLLAR_BYTE = '$';
    public static final byte ASTERISK_BYTE = '*';
    public static final byte PLUS_BYTE = '+';
    public static final byte MINUS_BYTE = '-';
    public static final byte COLON_BYTE = ':';
    public static final String EMPTY_STRING = "";
    public static final Long ZERO = 0L;
    public static final Long NEGATIVE_ONE = -1L;
    public static final byte CR = (byte) '\r';
    public static final byte LF = (byte) '\n';
    public static final byte[] CRLF = "\r\n".getBytes(ASCII);
    public enum ReplyType {
        SIMPLE_STRING,
        ERROR,
        INTEGER,
        BULK_STRING,
        RESP_ARRAY
    }
}
public enum CodecUtils {
    X;
    public int findLineEndIndex(ByteBuf buffer) {
        int index = buffer.forEachByte(ByteProcessor.FIND_LF);
        return (index > 0 && buffer.getByte(index - 1) == '\r') ? index : -1;
    }
    public String readLine(ByteBuf buffer) {
        int lineEndIndex = findLineEndIndex(buffer);
        if (lineEndIndex > -1) {
            int lineStartIndex = buffer.readerIndex();
            // 计算字节长度
            int size = lineEndIndex - lineStartIndex - 1;
            byte[] bytes = new byte[size];
            buffer.readBytes(bytes);
            // 重置读游标为\r\n之后的第一个字节
            buffer.readerIndex(lineEndIndex + 1);
            buffer.markReaderIndex();
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}
public interface RespCodec {
    RespCodec X = DefaultRespCodec.X;
    <IN, OUT> OUT decode(ByteBuf buffer);
    <IN, OUT> ByteBuf encode(IN in);
}
public enum DefaultRespCodec implements RespCodec {
    X;
    static final Map<ReplyType, RespDecoder> DECODERS = Maps.newConcurrentMap();
    private static final RespDecoder DEFAULT_DECODER = new DefaultRespDecoder();
    static {
        DECODERS.put(ReplyType.SIMPLE_STRING, new RespSimpleStringDecoder());
        DECODERS.put(ReplyType.ERROR, new RespErrorDecoder());
        DECODERS.put(ReplyType.INTEGER, new RespIntegerDecoder());
        DECODERS.put(ReplyType.BULK_STRING, new RespBulkStringDecoder());
        DECODERS.put(ReplyType.RESP_ARRAY, new RespArrayDecoder());
    }
    @SuppressWarnings("unchecked")
    @Override
    public <IN, OUT> OUT decode(ByteBuf buffer) {
        return (OUT) DECODERS.getOrDefault(determineReplyType(buffer), DEFAULT_DECODER).decode(buffer);
    }
    private ReplyType determineReplyType(ByteBuf buffer) {
        byte firstByte = buffer.readByte();
        ReplyType replyType;
        switch (firstByte) {
            case RespConstants.PLUS_BYTE:
                replyType = ReplyType.SIMPLE_STRING;
                break;
            case RespConstants.MINUS_BYTE:
                replyType = ReplyType.ERROR;
                break;
            case RespConstants.COLON_BYTE:
                replyType = ReplyType.INTEGER;
                break;
            case RespConstants.DOLLAR_BYTE:
                replyType = ReplyType.BULK_STRING;
                break;
            case RespConstants.ASTERISK_BYTE:
                replyType = ReplyType.RESP_ARRAY;
                break;
            default: {
                throw new IllegalArgumentException("first byte:" + firstByte);
            }
        }
        return replyType;
    }
    @Override
    public <IN, OUT> ByteBuf encode(IN in) {
        // TODO
        throw new UnsupportedOperationException("encode");
    }
}
public interface RespDecoder<V> {
    V decode(ByteBuf buffer);
}
public class DefaultRespDecoder implements RespDecoder {
    @Override
    public Object decode(ByteBuf buffer) {
        throw new IllegalStateException("decoder");
    }
}
public class LineStringDecoder implements RespDecoder<String> {
    @Override
    public String decode(ByteBuf buffer) {
        return CodecUtils.X.readLine(buffer);
    }
}
public class RespSimpleStringDecoder extends LineStringDecoder {
}
public class RespErrorDecoder extends LineStringDecoder {
}
public class RespIntegerDecoder implements RespDecoder<Long> {
    @Override
    public Long decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        // 没有行尾,异常
        if (-1 == lineEndIndex) {
            return null;
        }
        long result = 0L;
        int lineStartIndex = buffer.readerIndex();
        boolean negative = false;
        byte firstByte = buffer.getByte(lineStartIndex);
        // 负数
        if (RespConstants.MINUS_BYTE == firstByte) {
            negative = true;
        } else {
            int digit = firstByte - '0';
            result = result * 10 + digit;
        }
        for (int i = lineStartIndex + 1; i < (lineEndIndex - 1); i++) {
            byte value = buffer.getByte(i);
            int digit = value - '0';
            result = result * 10 + digit;
        }
        if (negative) {
            result = -result;
        }
        // 重置读游标为\r\n之后的第一个字节
        buffer.readerIndex(lineEndIndex + 1);
        return result;
    }
}
public class RespBulkStringDecoder implements RespDecoder<String> {
    @Override
    public String decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Bulk Null String
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Bulk Empty String
        if (RespConstants.ZERO.equals(length)) {
            return RespConstants.EMPTY_STRING;
        }
        // 真实字节内容的长度
        int readLength = (int) length.longValue();
        if (buffer.readableBytes() > readLength) {
            byte[] bytes = new byte[readLength];
            buffer.readBytes(bytes);
            // 重置读游标为\r\n之后的第一个字节
            buffer.readerIndex(buffer.readerIndex() + 2);
            return new String(bytes, RespConstants.UTF_8);
        }
        return null;
    }
}
public class RespArrayDecoder implements RespDecoder {
    @Override
    public Object decode(ByteBuf buffer) {
        int lineEndIndex = CodecUtils.X.findLineEndIndex(buffer);
        if (-1 == lineEndIndex) {
            return null;
        }
        // 解析元素个数
        Long length = (Long) DefaultRespCodec.DECODERS.get(ReplyType.INTEGER).decode(buffer);
        if (null == length) {
            return null;
        }
        // Null Array
        if (RespConstants.NEGATIVE_ONE.equals(length)) {
            return null;
        }
        // Array Empty List
        if (RespConstants.ZERO.equals(length)) {
            return Lists.newArrayList();
        }
        List<Object> result = Lists.newArrayListWithCapacity((int) length.longValue());
        // 递归
        for (int i = 0; i < length; i++) {
            result.add(DefaultRespCodec.X.decode(buffer));
        }
        return result;
    }
}


相关文章
|
NoSQL Redis 数据安全/隐私保护
Redis 最流行的图形化界面下载及使用超详细教程(带安装包)! redis windows客户端下载
文章提供了Redis最流行的图形化界面工具Another Redis Desktop Manager的下载及使用教程,包括如何下载、解压、连接Redis服务器以及使用控制台和查看数据类型详细信息。
3268 6
Redis 最流行的图形化界面下载及使用超详细教程(带安装包)! redis windows客户端下载
|
NoSQL Redis 数据库
Redis 图形化界面下载及使用超详细教程(带安装包)! redis windows下客户端下载
文章提供了Redis图形化界面工具的下载及使用教程,包括如何连接本地Redis服务器、操作键值对、查看日志和使用命令行等功能。
2446 0
Redis 图形化界面下载及使用超详细教程(带安装包)! redis windows下客户端下载
|
JSON NoSQL Java
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
这篇文章介绍了在Java中使用Redis客户端的几种方法,包括Jedis、SpringDataRedis和SpringBoot整合Redis的操作。文章详细解释了Jedis的基本使用步骤,Jedis连接池的创建和使用,以及在SpringBoot项目中如何配置和使用RedisTemplate和StringRedisTemplate。此外,还探讨了RedisTemplate序列化的两种实践方案,包括默认的JDK序列化和自定义的JSON序列化,以及StringRedisTemplate的使用,它要求键和值都必须是String类型。
redis的java客户端的使用(Jedis、SpringDataRedis、SpringBoot整合redis、redisTemplate序列化及stringRedisTemplate序列化)
|
NoSQL 网络协议 算法
Redis 客户端连接
10月更文挑战第21天
179 1
|
存储 消息中间件 NoSQL
Redis 入门 - C#.NET Core客户端库六种选择
Redis 入门 - C#.NET Core客户端库六种选择
548 8
|
JSON 缓存 NoSQL
Redis 在线查看序列化对象技术详解
Redis 在线查看序列化对象技术详解
265 3
|
JSON 缓存 NoSQL
redis序列化数据时,如何包含clsss类型信息?
通过配置 `com.fasterxml.jackson.databind.ObjectMapper` 的 `enableDefaultTyping` 方法,可以使序列化后的 JSON 包含类信息。
249 2
|
7月前
|
缓存 NoSQL 关系型数据库
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
美团面试:MySQL有1000w数据,redis只存20w的数据,如何做 缓存 设计?
|
2月前
|
缓存 负载均衡 监控
135_负载均衡:Redis缓存 - 提高缓存命中率的配置与最佳实践
在现代大型语言模型(LLM)部署架构中,缓存系统扮演着至关重要的角色。随着LLM应用规模的不断扩大和用户需求的持续增长,如何构建高效、可靠的缓存架构成为系统性能优化的核心挑战。Redis作为业界领先的内存数据库,因其高性能、丰富的数据结构和灵活的配置选项,已成为LLM部署中首选的缓存解决方案。
|
3月前
|
存储 缓存 NoSQL
Redis专题-实战篇二-商户查询缓存
本文介绍了缓存的基本概念、应用场景及实现方式,涵盖Redis缓存设计、缓存更新策略、缓存穿透问题及其解决方案。重点讲解了缓存空对象与布隆过滤器的使用,并通过代码示例演示了商铺查询的缓存优化实践。
207 1
Redis专题-实战篇二-商户查询缓存